2017-01-31 18 views
1

我遇到怪異的行爲與cpu pools防鏽期貨cpupool:不一致的行爲的解釋

#[macro_use] 
extern crate lazy_static; 
extern crate tokio_core; 
extern crate futures; 
extern crate futures_cpupool; 

use std::time::Duration; 

use futures_cpupool::{CpuPool, Builder, CpuFuture}; 
use futures::Stream; 
use futures::{Future, future, lazy}; 
use futures::sync::mpsc; 
use futures::Sink; 

lazy_static! { 
    static ref CPU_POOL: CpuPool = { 
     Builder::new() 
     .pool_size(10) 
     .after_start(|| { 
      println!("Pool started one thread"); 
     }) 
     .before_stop(|| { 
      println!("Pool stopped one thread"); 
     }) 
     .create() 
    }; 
    } 

struct Producer {} 

impl Producer { 
    fn search_names(&self) -> Box<Stream<Item = String, Error = String> + Send> { 
     let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1); 

     println!("Creating producer thread..."); 
     let producer_cpu: CpuFuture<(),()> = CPU_POOL.spawn(lazy(move || { 
       println!(" -- Begin to produce names"); 
       for i in 0..10 { 
        match tx.send(Ok("name".to_string())).wait() { 
         Ok(t) => { 
          println!(" -- sent the name"); 
          tx = t 
         } 
         Err(err) => { 
          println!(" -- Error occured sending name! {:?}", err); 
          break; 
         } 
        } 
        std::thread::sleep(Duration::from_secs(1)); 
       } 
       future::ok::<(),()>(()) 
      }) 
      .then(|result| { 
       match result { 
        Ok(data) => println!("Producer finished with data: {:?}", data), 
        Err(err) => println!("Producer finished with error: {:?}", err), 
       } 
       future::ok::<(),()>(()) 
      })); 

     rx.then(|r| r.unwrap()).boxed() 
    } 
} 

fn main() { 
    let producer = Producer {}; 

    let names = CPU_POOL.spawn(producer.search_names() 
     .map(|name| { 
      println!("name = {:?}", name); 
      name 
     }) 
     .collect() 
     .then(|result| { 
      match result { 
       Ok(data) => println!("Finished to read producer {:?}", data), 
       Err(err) => println!("Error reading stream of producer! {:?}", err), 
      } 
      future::ok::<(),()>(()) 
     })); 

    names.wait(); 
} 

以下是相應的Cargo.toml

[package] 
name = "example" 
version = "0.1.0" 

[dependencies] 
lazy_static = "^0.1.*" 

tokio-core = "^0.1" 
futures = "^0.1" 
futures-cpupool = "^0.1" 

我上鏽生成夜間(1.16.0-nightly (df8debf6d 2017-01-25)

我期望這個程序生成10 String s,通過println輸出它並退出。但是,大多數情況下,該程序不會生成String,並且正常退出,其他時候String正確生成。

這是第一種情況下的輸出:

Creating producer thread... 
Pool started one thread 
Finished to read producer [] 
Pool started one thread 
Pool started one thread 
Pool started one thread 
Pool started one thread 

和輸出時String小號獲取生成

Pool started one thread 
Pool started one thread 
Pool started one thread 
Pool started one thread 
Creating producer thread... 
-- Begin to produce names 
-- sent the name 
name = "name" 
Pool started one thread 
-- sent the name 
name = "name" 
Producer finished with data:() 
Finished to read producer ["name", "name"] 

我的感覺是,在第一種情況下,生產者線程沒有按無論出於何種原因,都不會在線程池中進行計劃。我一定錯過了一些東西,但我不知道是什麼。

回答

0

問題的原因是生產者未來的早期下降。

在方法search_names上,當search_names返回時,生成值的CpuFuture被刪除。丟棄時,CpuFuture被取消,從而跳過生成值。 行爲上的差異肯定來自未來下降與執行之間的競爭。

一種解決方案是參考生產者未來一直是這樣的應用:

#[macro_use] 
extern crate lazy_static; 
extern crate tokio_core; 
extern crate futures; 
extern crate futures_cpupool; 

use std::time::Duration; 

use futures_cpupool::{CpuPool, Builder, CpuFuture}; 
use futures::Stream; 
use futures::{Future, future, lazy}; 
use futures::sync::mpsc; 
use futures::Sink; 

lazy_static! { 
static ref CPU_POOL: CpuPool = { 
    Builder::new() 
    .pool_size(5) 
    .after_start(|| { 
     println!("Pool started one thread"); 
    }) 
    .before_stop(|| { 
     println!("Pool stopped one thread"); 
    }) 
    .create() 
}; 
} 

struct Producer {} 

impl Producer { 
    fn search_names(&self) -> (CpuFuture<(),()>, Box<Stream<Item = String, Error = String> + Send>) { 
     let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1); 

     println!("Creating producer thread..."); 
     let producer_cpu: CpuFuture<(),()> = CPU_POOL.spawn(
      lazy(move || { 
       println!(" -- Begin to produce names"); 
       for i in 0..2 { 
        match tx.send(Ok("name".to_string())).wait() { 
         Ok(t) => { 
          println!(" -- sent the name"); 
          tx = t 
         }, 
         Err(err) => { 
          println!(" -- Error occured sending name! {:?}", err); 
          break 
         }, 
        } 
        std::thread::sleep(Duration::from_secs(1)); 
       } 
       future::ok::<(),()>(()) 
      }).then(|result| { 
       match result { 
        Ok(data) => println!("Producer finished with data: {:?}", data), 
        Err(err) => println!("Producer finished with error: {:?}", err), 
       } 
       future::ok::<(),()>(()) 
      }) 
     ); 

     (
      producer_cpu, 
      rx.then(|r| r.unwrap()).boxed() 
     ) 
    } 
} 

fn main() { 
    let producer = Producer {}; 

    let (future, stream) = producer.search_names(); 
    let names = CPU_POOL.spawn(
     stream 
      .map(|name| { 
       println!("name = {:?}", name); 
       name 
      }) 
      .collect() 
      .then(|result| { 
       match result { 
        Ok(data) => println!("Finished to read producer {:?}", data), 
        Err(err) => println!("Error reading stream of producer! {:?}", err) 
       } 
       future::ok::<(),()>(()) 
      }) 
    ); 

    names.wait(); 
} 
+1

這是爲什麼編譯器告訴你*警告:未使用的變量:'producer_cpu' * – Shepmaster