2017-04-15 115 views
2

我正在使用Tokio庫提供的示例,並嘗試使用所有當前活動的TCP連接的向量。最終,我希望能夠通過遍歷它們並向套接字寫入消息來向每個活動連接廣播消息。使用互斥鎖從多個線程併發訪問向量

首先,我試圖在一個線程中打印當前的連接數,同時在另一個線程中接受連接。

爲此,我試圖使用共享向量。我還沒有實施從斷開連接中刪除連接的時間。

// A tiny async echo server with tokio-core 
extern crate futures; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{Future, Stream}; 
use tokio_io::{io, AsyncRead}; 
use tokio_core::net::TcpListener; 
use tokio_core::reactor::Core; 
use std::thread; 
use std::sync::{Arc, Mutex}; 
use std::io::stdout; 
use std::io::Write; 

fn main() { 
    // Create the event loop that will drive this server 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 

    // Bind the server's socket 
    let addr = "127.0.0.1:12345".parse().unwrap(); 
    let tcp = TcpListener::bind(&addr, &handle).unwrap(); 

    let mut connections = Arc::new((Mutex::new(Vec::new()))); 

    thread::spawn(move || { 
     //Every 10 seconds print out the current number of connections 
     let mut i; 
     loop {    
      i = connections.lock().unwrap().len(); 
      println!("There are {} connections", i); 
      stdout().flush(); 
      thread::sleep_ms(10000); 
     } 
    }); 



    // Iterate incoming connections 
    let server = tcp.incoming().for_each(|(tcp, _)| { 

     connections.lock().unwrap().push(tcp); 
     // Split up the read and write halves 
     let (reader, writer) = tcp.split(); 

     // Future of the copy 
     let bytes_copied = io::copy(reader, writer); 

     // ... after which we'll print what happened 
     let handle_conn = bytes_copied.map(|(n, _, _)| { 
      println!("wrote {} bytes", n) 
     }).map_err(|err| { 
      println!("IO error {:?}", err) 
     }); 

     // Spawn the future as a concurrent task 
     handle.spawn(handle_conn); 

     Ok(()) 
    }); 

    // Spin up the server on the event loop 
    core.run(server).unwrap(); 

} 

這是無法建立與以下錯誤的時刻:

error[E0382]: capture of moved value: `connections` 
    --> src/main.rs:36:42 
    | 
26 |  thread::spawn(move || { 
    |     ------- value moved (into closure) here 
... 
36 |  let server = tcp.incoming().for_each(|(tcp, _)| { 
    |           ^^^^^^^^^^ value captured here after move 
    | 
    = note: move occurs because `connections` has type `std::sync::Arc<std::sync::Mutex<std::vec::Vec<tokio_core::net::TcpStream>>>`, which does not implement the `Copy` trait 

error[E0382]: use of moved value: `tcp` 
    --> src/main.rs:40:32 
    | 
38 |   connections.lock().unwrap().push(tcp); 
    |           --- value moved here 
39 |   // Split up the read and write halves 
40 |   let (reader, writer) = tcp.split(); 
    |        ^^^ value used here after move 
    | 
    = note: move occurs because `tcp` has type `tokio_core::net::TcpStream`, which does not implement the `Copy` trait 

是否有可能實現這一目標而無需編寫任何不安全的代碼?

回答

5

你得到的第一個錯誤,因爲此舉關閉:

let mut connections = Arc::new((Mutex::new(Vec::new()))); 
thread::spawn(move || { 
    let mut i = connections.lock().unwrap().len(); 
    .... 
} 

實際上把整個Arc,而你只想要移動它的「部分」(即,在這樣的移動它引用計數遞增的方式,並且兩個線程都可以使用它)。

要做到這一點,我們可以使用Arc::clone

let mut connections = Arc::new((Mutex::new(Vec::new()))); 
let conn = connections.clone(); 
thread::spawn(move || { 
    let mut i = conn.lock().unwrap().len(); 
    .... 
} 

這樣,克隆Arcconn,移入關閉,原來Arcconnections,是不是,所以仍然可用。

我不確定你在做什麼,你的第二個錯誤,但爲了簡單計算連接,你不需要push整個事情。

+0

非常感謝您的回答!正如問題中提到的,最終我希望能夠擁有所有當前活動連接的向量,並能夠遍歷向量並向每個向其廣播消息。是否有可能將每個tcp連接推向向量? – John

+0

您可以像處理矢量一樣,嘗試將連接包裝在一個'Arc >'中,但這限制了您可以對其執行的操作(例如,您無法在'.split'調用中移動它。 – MartinHaTh

+0

我意識到我錯過了第一個線程的循環。也許現在會更有意義。 – John