2016-12-24 72 views
3

我想寫futures-rs MPSC隊列使用的一個簡單的〔實施例:如何製作簡單的futures :: sync :: mpsc :: channel示例工作?

extern crate futures; 

use futures::{Sink, Stream}; 
use futures::sync::mpsc; 
use std::thread; 

fn main() { 
    let (tx, rx) = mpsc::channel::<i32>(1000); 

    let handle = thread::spawn(move || { 
     tx.clone().send(1); 
     tx.clone().send(2); 
     tx.clone().send(3); 
    }); 

    let mut rx = rx.map(|x| { 
     println!("stream: {}", x); 
     x * x 
    }); 

    handle.join().unwrap(); 

    rx.poll().unwrap(); 
} 

但它沒有做任何輸出到控制檯(我希望它打印stream: 1stream: 2stream: 3)。我也嘗試用rx.wait()替換rx.poll().unwrap(),但它仍然不輸出任何內容。我在期貨rs文檔中沒有找到任何使用示例。我究竟做錯了什麼?

回答

3

它是強烈建議讀取編譯器告訴你的警告和錯誤消息。這是一個靜態類型語言的一個編譯器的大好處之一:

warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default 
    --> src/main.rs:11:9 
    | 
11 |   tx.clone().send(1); 
    |   ^^^^^^^^^^^^^^^^^^^ 

warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default 
    --> src/main.rs:12:9 
    | 
12 |   tx.clone().send(2); 
    |   ^^^^^^^^^^^^^^^^^^^ 

warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default 
    --> src/main.rs:13:9 
    | 
13 |   tx.clone().send(3); 
    |   ^^^^^^^^^^^^^^^^^^^ 

我不是專家與期貨,但這編譯沒有警告,並打印出所有三個值:

extern crate futures; 

use futures::{Future, Sink, Stream, Async}; 
use futures::sync::mpsc; 
use std::thread; 

fn main() { 
    let (tx, rx) = mpsc::channel::<i32>(1000); 

    let handle = thread::spawn(move || { 
     tx.send(1) 
      .and_then(|tx| tx.send(2)) 
      .and_then(|tx| tx.send(3)) 
      .wait() 
      .expect("Unable to send"); 
    }); 

    let mut rx = rx.map(|x| x * x); 

    handle.join().unwrap(); 

    while let Ok(Async::Ready(Some(v))) = rx.poll() { 
     println!("stream: {}", v); 
    } 
} 

and_then用於發送前一個值之後的每個後續值。 wait用於阻止生成的線程,直到所有內容都成功發送。 poll方法用於從隊列中獲取值直到耗盡。有多種方式可能會失敗,我忽略了所有這些,只關注成功案例。