我正在設計一個Python多處理代碼,以便在可能隨處理更新的隊列中工作。下面的代碼有時會起作用,或者卡住,或者出現Empty錯誤。使用管理器更新Python多進程中的隊列
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
if working_queue.empty() is True:
break
else:
picked = working_queue.get_nowait()
if picked % 2 == 0:
output_queue.put(picked)
else:
working_queue.put(picked+1)
return
if __name__ == '__main__':
manager = mp.Manager()
static_input = xrange(100)
working_q = manager.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() is True:
break
results_bank.append(output_q.get_nowait())
print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
results_bank.sort()
print results_bank
我應該使用一個列表作爲全局變量,並鎖定它,而不是一個manager.Queue()?
此代碼工作就好了我的機器上。 – plover
感謝您的評論。你能多試幾次嗎?對我來說,它在十分之三左右的時候效果很好。 – Jaqo
問題出在'如果working_queue.empty()爲真:'一個進程可以在另一個進程彈出最後一個值之前檢查是否爲空隊列,然後調用'working_queue.get_nowait()'會引發異常 –