所以這不像人們想象的那麼直截了當。
基本上我使用了一個調度程序線程,它可以像所有連接客戶端的控制中心那樣工作。因此,無論何時客戶端收到一條消息,都會將其發送給調度員,然後將消息分發給每個連接的客戶端。
我還必須在另一個線程中接收消息,因爲在rust-websocket中沒有非阻塞方式接收消息。然後我可以使用永久循環來檢查從websocket和調度程序接收到的新消息。
這裏是我的代碼是如何模樣到底:
extern crate websocket;
use std::str;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use websocket::{Server, Message, Sender, Receiver};
use websocket::header::WebSocketProtocol;
use websocket::message::Type;
fn main() {
let server = Server::bind("0.0.0.0:2794").unwrap();
let (dispatcher_tx, dispatcher_rx) = mpsc::channel::<String>();
let client_senders: Arc<Mutex<Vec<mpsc::Sender<String>>>> = Arc::new(Mutex::new(vec![]));
// dispatcher thread
{
let client_senders = client_senders.clone();
thread::spawn(move || {
while let Ok(msg) = dispatcher_rx.recv() {
for sender in client_senders.lock().unwrap().iter() {
sender.send(msg.clone()).unwrap();
}
}
});
}
// client threads
for connection in server {
let dispatcher = dispatcher_tx.clone();
let (client_tx, client_rx) = mpsc::channel();
client_senders.lock().unwrap().push(client_tx);
// Spawn a new thread for each connection.
thread::spawn(move || {
let request = connection.unwrap().read_request().unwrap(); // Get the request
let headers = request.headers.clone(); // Keep the headers so we can check them
request.validate().unwrap(); // Validate the request
let mut response = request.accept(); // Form a response
if let Some(&WebSocketProtocol(ref protocols)) = headers.get() {
if protocols.contains(&("rust-websocket".to_string())) {
// We have a protocol we want to use
response.headers.set(WebSocketProtocol(vec!["rust-websocket".to_string()]));
}
}
let mut client = response.send().unwrap(); // Send the response
let ip = client.get_mut_sender()
.get_mut()
.peer_addr()
.unwrap();
println!("Connection from {}", ip);
let message: Message = Message::text("SERVER: Connected.".to_string());
client.send_message(&message).unwrap();
let (mut sender, mut receiver) = client.split();
let(tx, rx) = mpsc::channel::<Message>();
thread::spawn(move || {
for message in receiver.incoming_messages() {
tx.send(message.unwrap()).unwrap();
}
});
loop {
if let Ok(message) = rx.try_recv() {
match message.opcode {
Type::Close => {
let message = Message::close();
sender.send_message(&message).unwrap();
println!("Client {} disconnected", ip);
return;
},
Type::Ping => {
let message = Message::pong(message.payload);
sender.send_message(&message).unwrap();
},
_ => {
let payload_bytes = &message.payload;
let payload_string = match str::from_utf8(payload_bytes) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let msg_string = format!("MESSAGE: {}: ", payload_string);
dispatcher.send(msg_string).unwrap();
}
}
}
if let Ok(message) = client_rx.try_recv() {
let message: Message = Message::text(message);
sender.send_message(&message).unwrap();
}
}
});
}
}
http://pastebin.com/H9McWLrH
我想類似的東西,放棄了。 AFAICS的問題是接收器對象沒有名爲'try_recv_message'的方法。這意味着,當您開始查找郵件時,服務器將會阻止,直到您實際收到郵件。 https://github.com/cyderize/rust-websocket/blob/2f80d4c4889602d63cc745aa18e3fdd4ae71eb8a/src/ws/receiver.rs 該項目命名爲鐵鏽聊天使用不同的WebSocket庫,以及他們的工作圍繞它通過使用多播。 https://github.com/nbaksalyar/rust-chat/blob/master/src/main.rs – nielsle
謝謝!我會研究它。但我不能相信這對於rust-websocket來說會很難,因爲即使他們的例子是使用網絡聊天。它看起來像他們打算讓用戶擴展他們的例子,因爲單一客戶端聊天對我來說沒有多大意義。 – DropOfBlood
我很可能錯過了一些東西。如果您找到解決方案,請發佈。 – nielsle