11

我對python多處理庫使用了一種算法,其中有很多工作人員處理某些數據並將結果返回給父進程。我使用multiprocessing.Queue將工作傳遞給工作人員,其次收集結果。Python多處理和處理工作中的異常

這一切都很好,直到工作人員無法處理某些數據塊。在每個工人以下簡化的例子有兩個階段:

  • 初始化 - 可能會失敗,在這種情況下工人應被銷燬
  • 數據處理 - 處理的數據塊可能會失敗,在這種情況下工人應跳過此大塊,並繼續下一個數據。

當這兩個階段中的任何一個失敗時,我會在腳本完成後發生死鎖。此代碼模擬了我的問題:

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.2 
fail_job_p = 0.3 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
     finally: 
      jobs_queue.task_done() 
    # Telling that we are done with processing stop token 
    jobs_queue.task_done() 



#========= Parent ========= 
jobs = mp.JoinableQueue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

# Collecting the results 
# What if some workers failed to process the job and we have 
# less results than expected 
for r in range(results_to_expect): 
    result = results.get() 
    print result 

#Signal all workers to finish 
for i in range(workers_count): 
    jobs.put(None) 

#Wait for them to finish 
jobs.join() 

我有兩個問題,關於這個代碼:

  1. init()失敗,如何檢測工人是無效的,而不是等待它完成?
  2. do_work()失敗,如何通知較少的結果應該在結果隊列有望父進程?

謝謝你的幫忙!

回答

10

我稍微改變你的代碼,使其工作(見下文解釋)。

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.5 
fail_job_p = 0.4 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     result_queue.put('init failed') 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
      result_queue.put('job failed') 


#========= Parent ========= 
jobs = mp.Queue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

init_failures = 0 
job_failures = 0 
successes = 0 
while job_failures + successes < 30 and init_failures < workers_count: 
    result = results.get() 
    init_failures += int(result == 'init failed') 
    job_failures += int(result == 'job failed') 
    successes += int(result != 'init failed' and result != 'job failed') 
    #print init_failures, job_failures, successes 

for ii in range(workers_count): 
    jobs.put(None) 

我的變化:

  1. 改變jobs是(而不是JoinableQueue)只是一個正常Queue
  2. 工人現在傳送回特殊的結果字符串「初始化失敗」和「作業失敗」。
  3. 用於所述特殊結果的主過程監視器只要特定條件生效。
  4. 最終,就把「停止」的要求(即None作業)必須然而,許多工人,不管。請注意,並非所有這些都可能從隊列中拉出(如果工人無法使用)。

順便說一句,你的原代碼很好,很容易處理。隨機概率位非常酷。

+2

或者您可以在結果隊列中放入一個元組'(result,error)'(成功時錯誤爲無),以避免出現帶內錯誤通信。 – jfs