2017-04-06 23 views
4

我有一個Hyper HTTP請求期貨的大型矢量,並且想要將它們解析爲結果向量。由於有最大打開文件的限制,我想限制併發到N期貨。加入期貨有限的併發

我已經用Stream::buffer_unordered進行了實驗,但似乎是一個接一個地執行期貨。

+2

你可以發佈你已經有的代碼嗎? –

+0

請[編輯]你的問題來解釋你爲什麼說「似乎是一個接一個地執行期貨」。我已經使用'buffer_unordered'來達到**這個確切的目的**,並且它對我有用。 – Shepmaster

回答

4

我們在項目中使用了code like this以避免打開太多的TCP套接字。這些期貨內有Hyper期貨,所以看起來正好是同樣的情況。

// Convert the iterator into a `Stream`. We will process 
// `PARALLELISM` futures at the same time, but with no specified 
// order. 
let all_done = 
    futures::stream::iter(iterator_of_futures.map(Ok)) 
    .buffer_unordered(PARALLELISM); 

// Everything after here is just using the stream in 
// some manner, not directly related 

let mut successes = Vec::with_capacity(LIMIT); 
let mut failures = Vec::with_capacity(LIMIT); 

// Pull values off the stream, dividing them into success and 
// failure buckets. 
let mut all_done = all_done.into_future(); 
loop { 
    match core.run(all_done) { 
     Ok((None, _)) => break, 
     Ok((Some(v), next_all_done)) => { 
      successes.push(v); 
      all_done = next_all_done.into_future(); 
     } 
     Err((v, next_all_done)) => { 
      failures.push(v); 
      all_done = next_all_done.into_future(); 
     } 
    } 
} 

這在一塊示例代碼被使用,所以事件循環(core)被明確地驅動。觀看程序使用的文件句柄的數量表明它已被封頂。另外,在添加這個瓶頸之前,我們很快耗盡了允許的文件句柄,而之後我們沒有。