2014-03-28 24 views
1

我有一些代碼將工作添加到一個隊列中,並由多個工作人員處理,然後將結果放入另一個隊列,並由他們處理最後的工人。當我有多個生產者將材料添加到此結果隊列中時,如何可靠地指示collector進程沒有更多要處理的內容?告訴多個生產者的消費者沒有更多的結果

import multiprocessing 
import time 

J = multiprocessing.Queue() 
R = multiprocessing.Queue() 

def intermediate_worker(jobs, results): 
    while True: 
     task = jobs.get() 
     if task is None: 
      jobs.put(None) 
      break 
     print 'working', task 
     results.put(task) 

def collector(result_queue) : 
    total = 0 
    while True : 
     result = result_queue.get() 
     total += result 
     print 'collection', total 


[multiprocessing.Process(target=intermediate_worker, args=(J,R)).start() 
for i in xrange(2)] 

multiprocessing.Process(target=collector, args=(R,)).start() 

for chunk_dummy in xrange(10) : 
    J.put(chunk) 
J.put(None) 

回答

2

有幾種方法。一種是尋找n毒丸,其中n是你有多少生產者。您已經熟悉這種方法,因爲您正在使用它關閉生產者。這可以工作,但有點笨重 - 你需要額外的邏輯在你的消費者跟蹤你見過多少毒丸。

我個人最喜歡的是用Semaphore來爲我計數。如果我們知道有多少生產者還活着,那麼這就是足夠的信息來確定什麼時候關閉消費者 - 我們在(no producers are alive)(the queue is empty)時關閉它。

J = multiprocessing.Queue() 
R = multiprocessing.Queue() 
S = multiprocessing.Semaphore(NUMWORKERS) 

def intermediate_worker(jobs, results, sem): 
    with sem: #context manager handles incrementing/decrementing the semaphore 
     while True: 
      task = jobs.get() 
      if task is None: 
       jobs.put(None) 
       break 
      print 'working', task 
      results.put(task) 

def collector(result_queue, sem) : 
    total = 0 
    while sem.get_value() < NUMWORKERS or not result_queue.empty(): 
     result = result_queue.get() 
     total += result 
     print 'collection', total 

[multiprocessing.Process(target=intermediate_worker, args=(J,R,S)).start() 
for i in xrange(NUMWORKERS)] 

multiprocessing.Process(target=collector, args=(R,S)).start() 

粗糙的代碼,完全未經測試,但應該得到重點。

相關問題