2013-05-21 29 views
17

我正在使用multiprocessing.Poolmultiprocessing.Queue在python中實現生產者 - 消費者模式。消費者是使用gevent產生多個任務的預分叉進程。什麼是最簡潔的方式來停止連接到無限循環中的隊列的Python多處理工作者?

下面是一個下調的代碼版本:

import gevent 
from Queue import Empty as QueueEmpty 
from multiprocessing import Process, Queue, Pool 
import signal 
import time 

# Task queue 
queue = Queue() 

def init_worker(): 
    # Ignore signals in worker 
    signal.signal(signal.SIGTERM, signal.SIG_IGN) 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 
    signal.signal(signal.SIGQUIT, signal.SIG_IGN) 

# One of the worker task 
def worker_task1(): 
    while True: 
     try: 
      m = queue.get(timeout = 2) 

      # Break out if producer says quit 
      if m == 'QUIT': 
       print 'TIME TO QUIT' 
       break 

     except QueueEmpty: 
      pass 

# Worker 
def work(): 
    gevent.joinall([ 
     gevent.spawn(worker_task1), 
    ]) 

pool = Pool(2, init_worker) 
for i in xrange(2): 
    pool.apply_async(work) 

try: 
    while True: 
     queue.put('Some Task') 
     time.sleep(2) 

except KeyboardInterrupt as e: 
    print 'STOPPING' 

    # Signal all workers to quit 
    for i in xrange(2): 
     queue.put('QUIT') 

    pool.join() 

現在,當我試圖離開它,我獲得以下狀態:

  1. 父進程正在等待孩子們的加入一個。
  2. 其中一個孩子已經失效。完成了,但父母正在等待其他孩子完成。
  3. 其他小孩正在顯示:futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...

那麼幹淨地結束這樣一個過程的正確方法是什麼?

回答

相關問題