沉重的警告我從來沒有使用過這個庫,而且我對一些概念的底層知識有點......缺乏。大多數情況下,我通過the tutorial閱讀。我很確定,任何做過異步工作的人都會閱讀並大笑,但這對其他人來說可能是一個有用的起點。買者自負!
讓我們用一些簡單一點開始,演示如何Stream
的作品。我們可以的Result
的Iterator轉換成流:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
這說明我們消費流的一種方式。我們使用and_then
對每個有效負載(這裏只是將其打印出來)做一些事情,然後使用for_each
將Stream
轉換回Future
。然後我們可以通過調用奇怪的名稱forget
method來運行未來。
接下來是將Redis庫綁定到混合中,只處理一條消息。由於get_message()
方法處於阻塞狀態,因此我們需要在混合中引入一些線程。在這種類型的異步系統中執行大量工作並不是一個好主意,因爲其他一切都將被阻止。 For example:
除另有安排是這樣的,應當確保該功能完成的實現非常快。
在一個理想的世界裏,redis板條箱將建立在像期貨這樣的圖書館之上,並將所有這些本地化。
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我的理解在這裏變得更模糊。在單獨的線程中,我們阻止該消息,並在獲取消息時將其推送到頻道中。我不明白的是爲什麼我們需要抓住線程的手柄。我期望foo.forget
會阻止自己,等待流是空的。
在telnet連接到Redis的服務器,發送此:
publish rust awesome
你會看到它的工作原理。添加打印語句表明(對我來說)foo.forget
語句是在線程產生之前運行的。
多條消息比較棘手。 Sender
消耗自身,以防止發電方過於耗費方太多。這是通過從send
返回另一個未來!我們需要穿梭回離開那裏重用它循環的下一次迭代:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我相信會有這種類型的互操作的多個生態系統隨着時間的推移。例如,futures-cpupool箱子可以可能進行擴展以支持類似的用例來此。
使用圖書館的當天它有大的公告;多麼雄心勃勃!^_^ – Shepmaster