4
我有一個Hyper HTTP請求期貨的大型矢量,並且想要將它們解析爲結果向量。由於有最大打開文件的限制,我想限制併發到N期貨。加入期貨有限的併發
我已經用Stream::buffer_unordered
進行了實驗,但似乎是一個接一個地執行期貨。
我有一個Hyper HTTP請求期貨的大型矢量,並且想要將它們解析爲結果向量。由於有最大打開文件的限制,我想限制併發到N期貨。加入期貨有限的併發
我已經用Stream::buffer_unordered
進行了實驗,但似乎是一個接一個地執行期貨。
我們在項目中使用了code like this以避免打開太多的TCP套接字。這些期貨內有Hyper期貨,所以看起來正好是同樣的情況。
// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
futures::stream::iter(iterator_of_futures.map(Ok))
.buffer_unordered(PARALLELISM);
// Everything after here is just using the stream in
// some manner, not directly related
let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);
// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
match core.run(all_done) {
Ok((None, _)) => break,
Ok((Some(v), next_all_done)) => {
successes.push(v);
all_done = next_all_done.into_future();
}
Err((v, next_all_done)) => {
failures.push(v);
all_done = next_all_done.into_future();
}
}
}
這在一塊示例代碼被使用,所以事件循環(core
)被明確地驅動。觀看程序使用的文件句柄的數量表明它已被封頂。另外,在添加這個瓶頸之前,我們很快耗盡了允許的文件句柄,而之後我們沒有。
你可以發佈你已經有的代碼嗎? –
請[編輯]你的問題來解釋你爲什麼說「似乎是一個接一個地執行期貨」。我已經使用'buffer_unordered'來達到**這個確切的目的**,並且它對我有用。 – Shepmaster