11
我對python多處理庫使用了一種算法,其中有很多工作人員處理某些數據並將結果返回給父進程。我使用multiprocessing.Queue將工作傳遞給工作人員,其次收集結果。Python多處理和處理工作中的異常
這一切都很好,直到工作人員無法處理某些數據塊。在每個工人以下簡化的例子有兩個階段:
- 初始化 - 可能會失敗,在這種情況下工人應被銷燬
- 數據處理 - 處理的數據塊可能會失敗,在這種情況下工人應跳過此大塊,並繼續下一個數據。
當這兩個階段中的任何一個失敗時,我會在腳本完成後發生死鎖。此代碼模擬了我的問題:
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
finally:
jobs_queue.task_done()
# Telling that we are done with processing stop token
jobs_queue.task_done()
#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
result = results.get()
print result
#Signal all workers to finish
for i in range(workers_count):
jobs.put(None)
#Wait for them to finish
jobs.join()
我有兩個問題,關於這個代碼:
- 當
init()
失敗,如何檢測工人是無效的,而不是等待它完成? - 當
do_work()
失敗,如何通知較少的結果應該在結果隊列有望父進程?
謝謝你的幫忙!
或者您可以在結果隊列中放入一個元組'(result,error)'(成功時錯誤爲無),以避免出現帶內錯誤通信。 – jfs