2014-08-28 76 views
0

我想了解Python併發性。作爲一個實驗,我有以下程序使用進程池並通過apply_async()調用工作程序。爲了在進程(工作和結果)之間共享信息,我使用了multiprocessing.Manager()中的一個隊列。Python併發性:使用apply_async()時掛起

但是,此代碼掛起 - 有時 - 當工作隊列中的所有項目都已處理完畢,並且我不確定原因。我必須運行程序幾次來觀察行爲。

作爲一個附註,我可以使這個工作正確:我發現了一些設計模式,人們有時稱爲「毒丸」的方法,它似乎工作。 (在我的worker()方法中,當我的工作隊列包含一個sentinel值時,我進入一個無限循環並跳出循環。我在工作隊列上創建儘可能多的sentinel值,就像我正在運行進程一樣)。

但我仍然有興趣找出爲什麼這段代碼掛起。 我得到如下的輸出(進程ID是在括號中):

Found 8 CPUs. 
Operation queue has 20 items. 
Will start 2 processes. 
Joining pool... 
[5885] entering worker() with work_queue size of 20 
[5885] processed work item 0 
[5885] worker() still running because work_queue has size 19 
[5885] processed work item 1 
[5885] worker() still running because work_queue has size 18 
[5885] processed work item 2 
[5885] worker() still running because work_queue has size 17 
[5885] processed work item 3 
[5885] worker() still running because work_queue has size 16 
[5885] processed work item 4 
[5885] worker() still running because work_queue has size 15 
[5885] processed work item 5 
[5886] entering worker() with work_queue size of 14 
[5885] worker() still running because work_queue has size 14 
[5886] processed work item 6 
[5886] worker() still running because work_queue has size 13 
[5885] processed work item 7 
[5886] processed work item 8 
[5885] worker() still running because work_queue has size 11 
[5886] worker() still running because work_queue has size 11 
[5885] processed work item 9 
[5886] processed work item 10 
[5885] worker() still running because work_queue has size 9 
[5886] worker() still running because work_queue has size 9 
[5885] processed work item 11 
[5886] processed work item 12 
[5885] worker() still running because work_queue has size 7 
[5886] worker() still running because work_queue has size 7 
[5885] processed work item 13 
[5886] processed work item 14 
[5885] worker() still running because work_queue has size 5 
[5886] worker() still running because work_queue has size 5 
[5885] processed work item 15 
[5886] processed work item 16 
[5885] worker() still running because work_queue has size 3 
[5886] worker() still running because work_queue has size 3 
[5885] processed work item 17 
[5886] processed work item 18 
[5885] worker() still running because work_queue has size 1 
[5886] worker() still running because work_queue has size 1 
[5885] processed work item 19 
[5885] worker() still running because work_queue has size 0 
[5885] worker() is finished; returning results of size 20 

(。程序掛起的最後一行的其他工藝 - 5886 - 似乎並沒有完成)

import multiprocessing 
from multiprocessing import Pool 
import os 

# Python 2.7.6 on Linux 

# Worker (consumer) process 
def worker(work_queue, results_queue): 
    print "[%d] entering worker() with work_queue size of %d" % (os.getpid(), work_queue.qsize()) 
    while not work_queue.empty(): 
     item = work_queue.get() 
     print "[%d] processed work item %s" % (os.getpid(), item) 
     s = '[%d] has processed %s.' % (os.getpid(), item) 
     results_queue.put(s) 
     work_queue.task_done() 
     print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize()) 
    print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize()) 

if __name__ == '__main__': 

    MAX_PROCESS = 2  # Max number of processes to create 
    MAX_ITEMS = 20  # Max work items to process 

    m = multiprocessing.Manager() 
    results_queue = m.Queue() 
    work_queue = m.Queue() 

    # Assign work 
    for x in xrange(MAX_ITEMS): 
     work_queue.put(x)  

    print "Found %d CPUs." % multiprocessing.cpu_count() 
    print "Operation queue has %d items." % work_queue.qsize() 
    print "Will start %d processes." % MAX_PROCESS 

    # Pool method 
    pool = Pool(processes=MAX_PROCESS) 
    for n in range(0, MAX_PROCESS): 
     pool.apply_async(worker, args=(work_queue, results_queue)) 
    pool.close() 
    print "Joining pool..." 
    pool.join() 
    print "Joining pool finished..." 
    print "--- After pool completion ---" 

    print "Work queue has %d items." % work_queue.qsize() 
    print "Results queue has %d items." % results_queue.qsize() 
    print "Results are:" 

    while not results_queue.empty(): 
     item = results_queue.get() 
     print str(item) 
     results_queue.task_done() 
    print "--End---" 

感謝您的幫助。

回答

1

你打一個競爭條件 - 工藝5886看到了Pool中有一個項目:

[5886] worker() still running because work_queue has size 1 

因此,循環迴繞到阻塞get電話:

while not work_queue.empty(): # It sees it's not emtpy here 
    item = work_queue.get() # But there's no item left by the time it gets here! 

然而,在它調用work_queue.empty()之後,之前它調用work_queue.get(),另一個工作者(5885)消耗了最後一項隊列:

[5885] processed work item 19 
[5885] worker() still running because work_queue has size 0 
[5885] worker() is finished; returning results of size 20 

所以現在5886將永遠阻止就get。一般來說,如果存在多個隊列消費者,則不應使用empty()方法來決定是否阻止get()調用,因爲它容易受到這種競爭條件的影響。使用「毒丸」你提到/定點方法是正確的方式來處理這種情況,或者使用非阻塞get通話,並捕捉Empty例外,它應該會出現:

try: 
    item = work_queue.get_nowait() 
    print "[%d] processed work item %s" % (os.getpid(), item) 
    s = '[%d] has processed %s.' % (os.getpid(), item) 
    results_queue.put(s) 
    work_queue.task_done() 
    print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize()) 
except Queue.Empty: 
    print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize()) 

請注意,您只能如果你知道,一旦員工開始消費,隊列的大小將不會增長。否則,當有更多項目添加到隊列中時,您可以決定工人應該退出。