2016-02-27 160 views
0

我試圖使用Rust-Websocket創建一個簡單的聊天室,其中多個人可以互相交談。使用rust-websocket聊天

我看了一下例子,'server.rs'和'websockets.html'對我來說看起來像一個體面的起點。所以我只是嘗試啓動並從網絡連接。一切正常,但我只能與自己溝通,而不能與其他連接進行溝通(因爲它直接將消息發送回sender而不是每個連接)。

所以我試圖得到一個載體,所有senders/clients所以我可以遍歷它們併發送消息給每一個,但這似乎是有問題的。我無法溝通senderclient,因爲它不是線程安全的,我也不能複製其中的任何一個。

我不確定我是否不明白整個借款是100%還是不打算做這樣的交叉連接通信。

server.rs:
https://github.com/cyderize/rust-websocket/blob/master/examples/server.rs

websockets.html:
https://github.com/cyderize/rust-websocket/blob/master/examples/websockets.html

我可能會從錯誤的方向接近這一點。與所有其他線程共享收到的消息可能會更容易。我想到了這一點,但我能想到的唯一事情就是使用channels從線程內部向外部發送消息。有什麼方法可以在線程之間直接傳播消息嗎?我所需要做的就是從一個線程發送一個字符串到另一個線程。

+0

我想類似的東西,放棄了。 AFAICS的問題是接收器對象沒有名爲'try_recv_message'的方法。這意味着,當您開始查找郵件時,服務器將會阻止,直到您實際收到郵件。 https://github.com/cyderize/rust-websocket/blob/2f80d4c4889602d63cc745aa18e3fdd4ae71eb8a/src/ws/receiver.rs 該項目命名爲鐵鏽聊天使用不同的WebSocket庫,以及他們的工作圍繞它通過使用多播。 https://github.com/nbaksalyar/rust-chat/blob/master/src/main.rs – nielsle

+0

謝謝!我會研究它。但我不能相信這對於rust-websocket來說會很難,因爲即使他們的例子是使用網絡聊天。它看起來像他們打算讓用戶擴展他們的例子,因爲單一客戶端聊天對我來說沒有多大意義。 – DropOfBlood

+0

我很可能錯過了一些東西。如果您找到解決方案,請發佈。 – nielsle

回答

0

所以這不像人們想象的那麼直截了當。

基本上我使用了一個調度程序線程,它可以像所有連接客戶端的控制中心那樣工作。因此,無論何時客戶端收到一條消息,都會將其發送給調度員,然後將消息分發給每個連接的客戶端。

我還必須在另一個線程中接收消息,因爲在rust-websocket中沒有非阻塞方式接收消息。然後我可以使用永久循環來檢查從websocket和調度程序接收到的新消息。

這裏是我的代碼是如何模樣到底:

extern crate websocket; 

use std::str; 
use std::sync::{Arc, Mutex}; 
use std::sync::mpsc; 
use std::thread; 

use websocket::{Server, Message, Sender, Receiver}; 
use websocket::header::WebSocketProtocol; 
use websocket::message::Type; 


fn main() { 
    let server = Server::bind("0.0.0.0:2794").unwrap(); 

    let (dispatcher_tx, dispatcher_rx) = mpsc::channel::<String>(); 
    let client_senders: Arc<Mutex<Vec<mpsc::Sender<String>>>> = Arc::new(Mutex::new(vec![])); 

    // dispatcher thread 
    { 
     let client_senders = client_senders.clone(); 
     thread::spawn(move || { 
      while let Ok(msg) = dispatcher_rx.recv() { 
       for sender in client_senders.lock().unwrap().iter() { 
        sender.send(msg.clone()).unwrap(); 
       } 
      } 
     }); 
    } 

    // client threads 
    for connection in server { 
     let dispatcher = dispatcher_tx.clone(); 
     let (client_tx, client_rx) = mpsc::channel(); 
     client_senders.lock().unwrap().push(client_tx); 

     // Spawn a new thread for each connection. 
     thread::spawn(move || { 
      let request = connection.unwrap().read_request().unwrap(); // Get the request 
      let headers = request.headers.clone(); // Keep the headers so we can check them 

      request.validate().unwrap(); // Validate the request 

      let mut response = request.accept(); // Form a response 

      if let Some(&WebSocketProtocol(ref protocols)) = headers.get() { 
       if protocols.contains(&("rust-websocket".to_string())) { 
        // We have a protocol we want to use 
        response.headers.set(WebSocketProtocol(vec!["rust-websocket".to_string()])); 
       } 
      } 

      let mut client = response.send().unwrap(); // Send the response 

      let ip = client.get_mut_sender() 
       .get_mut() 
       .peer_addr() 
       .unwrap(); 

      println!("Connection from {}", ip); 

      let message: Message = Message::text("SERVER: Connected.".to_string()); 
      client.send_message(&message).unwrap(); 

      let (mut sender, mut receiver) = client.split(); 

      let(tx, rx) = mpsc::channel::<Message>(); 
      thread::spawn(move || { 
       for message in receiver.incoming_messages() { 
        tx.send(message.unwrap()).unwrap(); 
       } 
      }); 

      loop { 
       if let Ok(message) = rx.try_recv() { 
        match message.opcode { 
         Type::Close => { 
          let message = Message::close(); 
          sender.send_message(&message).unwrap(); 
          println!("Client {} disconnected", ip); 
          return; 
         }, 
         Type::Ping => { 
          let message = Message::pong(message.payload); 
          sender.send_message(&message).unwrap(); 
         }, 
         _ => { 
          let payload_bytes = &message.payload; 
          let payload_string = match str::from_utf8(payload_bytes) { 
           Ok(v) => v, 
           Err(e) => panic!("Invalid UTF-8 sequence: {}", e), 
          }; 
          let msg_string = format!("MESSAGE: {}: ", payload_string); 
          dispatcher.send(msg_string).unwrap(); 
         } 
        } 
       } 
       if let Ok(message) = client_rx.try_recv() { 
        let message: Message = Message::text(message); 
        sender.send_message(&message).unwrap(); 
       } 
      } 
     }); 
    } 
} 

http://pastebin.com/H9McWLrH