2016-08-11 62 views
7

我正在嘗試創建一個系統,通過它我的應用程序可以從Redis PubSub渠道接收流式數據並對其進行處理。我使用,與其他所有的Redis驅動鐵鏽,我已經看到沿Redis driver,使用阻塞操作從只有在接收到的數據返回一個值的通道獲得數據:如何使用futures.rs和Redis PubSub實現阻塞呼叫的期貨流?

let msg = match pubsub.get_message() { 
     Ok(m) => m, 
     Err(_) => panic!("Could not get message from pubsub!") 
}; 
let payload: String = match msg.get_payload() { 
    Ok(s) => s, 
    Err(_) => panic!("Could not convert redis message to string!") 
}; 

我我想在將來使用futures-rs庫來封裝此阻塞函數調用,以便在等待輸入時我可以在我的應用程序中執行其他任務。

我讀了tutorial期貨,並試圖創建一個Stream,這將顯示PubSub收到數據時的信號,但我無法弄清楚如何操作。

如何創建schedulepoll函數用於阻止pubsub.get_message()函數?

+5

使用圖書館的當天它有大的公告;多麼雄心勃勃!^_^ – Shepmaster

回答

10

沉重的警告我從來沒有使用過這個庫,而且我對一些概念的底層知識有點......缺乏。大多數情況下,我通過the tutorial閱讀。我很確定,任何做過異步工作的人都會閱讀並大笑,但這對其他人來說可能是一個有用的起點。買者自負!


讓我們用一些簡單一點開始,演示如何Stream的作品。我們可以的Result的Iterator轉換成流:

extern crate futures; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())]; 
    let payloads = stream::iter(payloads.into_iter()); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
} 

這說明我們消費流的一種方式。我們使用and_then對每個有效負載(這裏只是將其打印出來)做一些事情,然後使用for_eachStream轉換回Future。然後我們可以通過調用奇怪的名稱forget method來運行未來。


接下來是將Redis庫綁定到混合中,只處理一條消息。由於get_message()方法處於阻塞狀態,因此我們需要在混合中引入一些線程。在這種類型的異步系統中執行大量工作並不是一個好主意,因爲其他一切都將被阻止。 For example

除另有安排是這樣的,應當確保該功能完成的實現非常快

在一個理想的世界裏,redis板條箱將建立在像期貨這樣的圖書館之上,並將所有這些本地化。

extern crate redis; 
extern crate futures; 

use std::thread; 
use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let msg = pubsub.get_message().expect("Unable to get message"); 
     let payload: Result<String, _> = msg.get_payload(); 
     tx.send(payload).forget(); 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

我的理解在這裏變得更模糊。在單獨的線程中,我們阻止該消息,並在獲取消息時將其推送到頻道中。我不明白的是爲什麼我們需要抓住線程的手柄。我期望foo.forget會阻止自己,等待流是空的。

在telnet連接到Redis的服務器,發送此:

publish rust awesome 

你會看到它的工作原理。添加打印語句表明(對我來說)foo.forget語句是在線程產生之前運行的。


多條消息比較棘手。 Sender消耗自身,以防止發電方過於耗費方太多。這是通過從send返回另一個未來!我們需要穿梭回離開那裏重用它循環的下一次迭代:

extern crate redis; 
extern crate futures; 

use std::thread; 
use std::sync::mpsc; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let mut tx = tx; 

     while let Ok(msg) = pubsub.get_message() { 
      let payload: Result<String, _> = msg.get_payload(); 

      let (next_tx_tx, next_tx_rx) = mpsc::channel(); 

      tx.send(payload).and_then(move |new_tx| { 
       next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); 
       futures::finished(()) 
      }).forget(); 

      tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); 
     } 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

我相信會有這種類型的互操作的多個生態系統隨着時間的推移。例如,futures-cpupool箱子可以可能進行擴展以支持類似的用例來此。

+0

感謝您的驚人答案!只有一個問題:不加入'redis_thread'否定了使得結果讀取過程非阻塞的全部努力?也許有一些我不瞭解的東西。 – Ameo

+1

「我期望foo.forget會阻止自己,等到流爲空」實際上,期貨沒有義務提供「阻止直到就緒」的方法。 '忘記()',就其描述而言,需要在未來丟失時防止自動取消,但與等待無關。例如,在Scala中,沒有關於Future的方法,但是有一個獨立的'Await.ready' /'Await.result'方法,在一定的時間內等待未來準備就緒。 –

+1

據我所知,未來-RS可以通過['Future :: select']實現類似的功能(http://alexcrichton.com/futures-rs/futures/trait.Future.html#method .select),第二個未來在固定超時後完成。 –