2017-09-28 33 views
0

我試圖找出一種很好的方法來使用Python 3.6中的multiprocessing包運行一組大約100個任務,其中最多同時運行4個任務。我也想要:Python多處理:檢索下一個結果

  1. 反覆收集池中下一個完成的任務並處理其返回值,直到所有任務都成功或失敗;
  2. 使任何給定任務中拋出的異常非致命,所以我仍然可以從其他任務中訪問結果。

我不需要維護提交給池的任務的順序(即我不需要隊列)。任務總數(「100」以上)不是非常大,例如,我不介意一次全部提交,讓他們排隊,直到有工人。

我認爲multiprocessing.Pool會很適合這個,但我似乎無法找到可以迭代調用的「get next result」方法。

這是我將不得不從流程管理原語中滾動嗎?或者可以Pool(或者我缺少的其他東西)支持這個工作流程?

對於上下文,我使用每個worker調用可能需要幾分鐘的遠程進程,並且有能力同時處理N個作業(上面具體化示例中的「4」)。

+0

http://pyvideo.org/search.html?q=multiprocessing – wwii

+0

@wwii是否有一些視頻特別推薦您解決這個問題? –

+0

只有一般 - 我覺得Pycon的視頻講述的內容非常豐富。另外,[multiprocessing module documentation](https://docs.python.org/3/library/multiprocessing.html)中給出的示例似乎足以讓我在開始玩遊戲時進行實驗。 – wwii

回答

0

我想出了以下模式(顯示使用2名工人& 6個就業機會,而不是4 & 100):

import random 
import time 
from multiprocessing import Pool, TimeoutError 
from queue import Queue 


def worker(x): 
    print("Start: {}".format(x)) 
    time.sleep(5 * random.random()) # Sleep a random amount of time 
    if x == 2: 
     raise Exception("Two is bad") 
    return x 


if __name__ == '__main__': 

    with Pool(processes=2) as pool: 
     jobs = Queue() 
     for i in range(6): 
      jobs.put(pool.apply_async(worker, [i])) 

     while not jobs.empty(): 
      j = jobs.get(timeout=1) 
      try: 
       r = j.get(timeout=0.1) 
       print("Done: {}".format(r)) 
      except TimeoutError as e: 
       jobs.put(j) # Not ready, try again later 
      except Exception as e: 
       print("Exception: {}".format(e)) 

似乎工作得很好:

Start: 0 
Start: 1 
Start: 2 
Done: 1 
Start: 3 
Exception: Two is bad 
Start: 4 
Start: 5 
Done: 3 
Done: 4 
Done: 5 
Done: 0 

我會看看我是否可以製作一個通用的工具來管理我的排隊。

我認爲它的主要缺點是完成的工作可能會被忽略一段時間,而未完成的工作被輪詢並可能超時。避免這可能需要使用回調 - 如果它成爲一個足夠大的問題,我可能會添加到我的應用程序。

+0

你不應該排隊工作,但結果。對於'apply_async'的'callback'參數,您可以設置一個將結果放入隊列的函數。然後一個單獨的線程可以順序地「獲得」結果。 –