2012-07-18 95 views
6

有沒有辦法重新發送一塊數據進行處理,如果原來的計算失敗,使用一個簡單的池?蟒蛇多處理池重試

import random 
from multiprocessing import Pool 

def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
+1

也許你想'回報˚F (x)'而不是引發'ValueError'?只是猜測... – 2012-07-24 02:03:16

+0

實際應用中失敗的機率有多高?也就是說,與等待其他進程首先完成相比,進程重試的過程有多重要? – Isaac 2012-07-24 02:05:12

+0

這是一個失敗的中等機會,它不需要立即重試(但最終應平行重試)。 – ash 2012-07-24 05:29:48

回答

9

如果你可以(或者不介意)立即重試,使用裝飾包裹的功能:

import random 
from multiprocessing import Pool 
from functools import wraps 

def retry(f): 
    @wraps(f) 
    def wrapped(*args, **kwargs): 
     while True: 
      try: 
       return f(*args, **kwargs) 
      except ValueError: 
       pass 
    return wrapped 

@retry 
def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
5

可以使用Queue通過一個循環的開始Process反饋故障到Pool

import multiprocessing as mp 
import random 

def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    total_items = len(pending) 
    successful = [] 
    failure_tracker = [] 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, pending) 
    retry_results = [] 
    while len(successful) < total_items: 
     successful.extend([r for r in results if not r is None]) 
     successful.extend([r for r in retry_results if not r is None]) 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     if failed_items: 
      failure_tracker.append(failed_items) 
      retry_results = p.imap(f, failed_items); 
    p.close() 
    p.join() 

    print "Results: %s" % successful 
    print "Failures: %s" % failure_tracker 

if __name__ == '__main__': 
    main(range(1, 10)) 

輸出是這樣的:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] 
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

一個Pool着在多個進程之間共享。因此,這種基於Queue的方法。如果你試圖通過一個池作爲參數傳遞給池的過程中,您將收到此錯誤:

NotImplementedError: pool objects cannot be passed between processes or pickled 

你可以嘗試或者你的函數f在數立即重試,以避免同步開銷。這實際上是一個問題,你的函數應該等待多久才能重試,以及如果立即重試成功的可能性有多大。


老答案:爲了完整起見,這裏是我的老的答案,這是不直接重新提交入池是最佳的,但可能仍然是相關取決於使用情況,因爲它提供了一個自然的方式來處理/限n -level重:

可以使用Queue來聚集失敗和在每次運行結束時重新提交,在多次運行:

import multiprocessing as mp 
import random 


def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    run_number = 1 
    while pending: 
     jobs = pending 
     pending = [] 

     q = mp.Queue() 
     p = mp.Pool(None, f_init, [q]) 
     results = p.imap(f, jobs) 
     p.close() 

     p.join() 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     successful = [r for r in results if not r is None] 
     print "(%d) Succeeded: %s" % (run_number, successful) 
     print "(%d) Failed: %s" % (run_number, failed_items) 
     print 
     pending = failed_items 
     run_number += 1 

if __name__ == '__main__': 
    main(range(1, 10)) 

,像這樣的輸出:

(1) Succeeded: [9, 16, 36, 81] 
(1) Failed: [2, 1, 5, 7, 8] 

(2) Succeeded: [64] 
(2) Failed: [2, 1, 5, 7] 

(3) Succeeded: [1, 25] 
(3) Failed: [2, 7] 

(4) Succeeded: [49] 
(4) Failed: [2] 

(5) Succeeded: [4] 
(5) Failed: [] 
+0

已將我的答案更新爲不需要多次運行的答案,現在可以在同一個原始資源池中運行。 – 2012-07-24 04:44:12

+0

感謝您的詳細回覆。我喜歡將重試失敗的計算放入隊列中的想法。我必須獎勵安德魯,因爲他的解決方案只是簡單的重試。 – ash 2012-07-24 20:10:38

+0

@ash我在我的回覆中提到立即重試,認爲這將是一個微不足道的/簡單的添加,而不是你想要的。還要注意的是,即時重試對於所有情況都不是最佳的,尤其是那些立即重試成功機率較低的情況(在這種情況下,它會嚴重不理想,因爲它會導致可能成功的工作資源匱乏。)恭喜Andrew無論如何。 – 2012-07-26 04:26:32