1

我目前有一個生成器的形式的代碼調用一個IO綁定的任務。發電機實際上也會調用子發電機,所以我們將會讚賞更通用的解決方案。我應該在這裏使用協程或其他調度對象嗎?

類似以下內容:

def processed_values(list_of_io_tasks): 
    for task in list_of_io_tasks: 
     value = slow_io_call(task) 
     yield postprocess(value) # in real version, would iterate over 
           # processed_values2(value) here 

我有完全的控制權slow_io_call,我不在乎以什麼順序我從processed_values的項目。有沒有類似於協程的東西,我可以通過將slow_io_call轉換爲異步函數並使用哪個調用返回最快來獲得最快順序的結果?我預計list_of_io_tasks至少有數千個條目。除了顯式線程外,我從來沒有做過任何並行的工作,特別是我從來沒有使用可用的各種形式的輕量線程。

我需要使用標準的CPython實現,並且我正在Linux上運行。

回答

2

聽起來像你在尋找multiprocessing.Pool(),特別是Pool.imap_unordered()方法。

以下是函數的一個端口,它使用imap_unordered()並行調用slow_io_call()。

def processed_values(list_of_io_tasks): 
    pool = multiprocessing.Pool(4) # num workers 
    results = pool.imap_unordered(slow_io_call, list_of_io_tasks) 
    while True: 
     yield results.next(9999999) # large time-out 

請注意,你也可以不用while True循環遍歷results直接(即for item in results: yield item),但是調用results.next()有超時值的工作原理是multiprocessing keyboard interrupt bug周圍,讓你殺的主要過程和所有子進程與按Ctrl-C。另請注意,StopIteration異常不會在此函數中捕獲,但當results.next()不再有項目返回時會引發StopIteration異常。這從發生器函數是合法的,比如這個函數,當沒有更多的值產生或者停止產生並且代表它引發StopIteration異常時,它們可能會引發StopIteration錯誤。

要到位的過程中使用線程,更換
import multiprocessing

import multiprocessing.dummy as multiprocessing

+0

啊,問題僅僅是slow_io_call被外包給外面蟒蛇,可能需要一段時間才能恢復 - 添加儘可能多的儘可能進入slow_io_call的實例會更好,因爲我可以發出儘可能多的同時請求(例如:查詢分佈式客戶端的信息,結合硬盤寫入,請求數量不會顯着影響響應任何請求的時間)。雖然沒有記錄,但我假設一個進程池最多進入迭代器4次,然後進行比賽,而不是暫停並進入下一個迭代步驟? – 2011-04-19 00:43:36

+0

嘗試增加池大小並比較進程與線程池,以便爲您的應用程序找到最佳解決方案,這總是相對加速和開銷增加之間的平衡。 – Garrett 2011-04-19 03:01:50

相關問題