2015-12-28 62 views
-1

我想產生X數量的池工作者,並給他們每個人工作的X%。我的問題是,由於計算類型的完成,我的答案可能會在幾分鐘或幾小時內找到,因此工作耗時大約需要20分鐘,每個額外過程需要耗費更長的時間。我想要做的是實現某種方式讓單個工作人員去「我發現它」,並使用該信號殺死游泳池的剩餘部分並繼續進行計算。優雅的終止工作者池

要點:

  • 我曾嘗試回調,他們似乎並不直到整個池完成對starmap_async運行。
  • 我只在乎找到的第一個合適的答案。
  • 我不是共享資源,但突然處理死亡,儘管粗魯,是完全可以接受的。

我也考慮過使用一個隊列,但它不會使,因爲我傳遞給每個工作的範圍已經內置到函數的參數。

下面就是我的工作非常遲鈍的版本(我的工作計算可能需要數小時才能完成超過4.2十億複雜的迭代器)。

def doWork(): 
    workers = Pool(2) 
    results = workers.starmap_async(func = distSearch , iterable = Sections1_5, callback = killPool) 
    workers.close() 
    print("Found answer : {}".format(results.get())) 
    workers.join() 

def killPool(): 
    workers.terminate() 
    print("Worker Pool Terminated") 

我也許應該指定我的過程只會在找到答案時纔會返回,否則只會在完成後退出。我已經看過this線程,但它已經完全丟失了,似乎有很多開銷,以便在應用程序池的返回/回調中始終檢查勝利條件。

我發現的所有答案都會導致監督員工池的大量開銷,我正在尋找一種解決方案,可以在工作人員級別自動獲取kill信號。

回答

1

我正在尋找一種解決方案,可以在工作人員級別自動獲取kill信號。

AFAIK,不存在。 Pool對象的方法(如Pool.terminate)應該只有用於創建池的過程中。

你可以做的是使用Pool.imap_unordered。這將在父進程中返回一個迭代器,該結果一旦可用就會生成結果。一旦彈出想要的結果,您就可以使用Pool.terminate()

編輯

  • 從看3.5實施starmap_async返回一個MapResult實例,這是的迭代器。
  • 可以將多個輸入封裝在一個元組中,並在這些列表中使用imap_unordered
+0

我正在使用'starmap_async'。它產生一個迭代器,但似乎阻塞,直到所有結果返回。 (它不應該)。這是一個已知的錯誤?我搜索並沒有發現任何記錄的問題。另外,'imap_unordered'是否支持多個函數輸入?最後一個音符在上面的代碼中,'killpool()'在工作池完成後立即運行,我只想在第一次返回時殺死它。 –

+0

@DerrickCheek我已經編輯了我的答案以解決您的評論。 –

+0

感謝您的更新!當我回家時我會試試這個。我一定困惑了迭代器的返回。我有幾個問題試圖映射多個輸入,一些結果給了迭代器,有些則不喜歡。希望這有效。我會及時通知你的。 –