我想了解Python併發性。作爲一個實驗,我有以下程序使用進程池並通過apply_async()調用工作程序。爲了在進程(工作和結果)之間共享信息,我使用了multiprocessing.Manager()中的一個隊列。Python併發性:使用apply_async()時掛起
但是,此代碼掛起 - 有時 - 當工作隊列中的所有項目都已處理完畢,並且我不確定原因。我必須運行程序幾次來觀察行爲。
作爲一個附註,我可以使這個工作正確:我發現了一些設計模式,人們有時稱爲「毒丸」的方法,它似乎工作。 (在我的worker()方法中,當我的工作隊列包含一個sentinel值時,我進入一個無限循環並跳出循環。我在工作隊列上創建儘可能多的sentinel值,就像我正在運行進程一樣)。
但我仍然有興趣找出爲什麼這段代碼掛起。 我得到如下的輸出(進程ID是在括號中):
Found 8 CPUs.
Operation queue has 20 items.
Will start 2 processes.
Joining pool...
[5885] entering worker() with work_queue size of 20
[5885] processed work item 0
[5885] worker() still running because work_queue has size 19
[5885] processed work item 1
[5885] worker() still running because work_queue has size 18
[5885] processed work item 2
[5885] worker() still running because work_queue has size 17
[5885] processed work item 3
[5885] worker() still running because work_queue has size 16
[5885] processed work item 4
[5885] worker() still running because work_queue has size 15
[5885] processed work item 5
[5886] entering worker() with work_queue size of 14
[5885] worker() still running because work_queue has size 14
[5886] processed work item 6
[5886] worker() still running because work_queue has size 13
[5885] processed work item 7
[5886] processed work item 8
[5885] worker() still running because work_queue has size 11
[5886] worker() still running because work_queue has size 11
[5885] processed work item 9
[5886] processed work item 10
[5885] worker() still running because work_queue has size 9
[5886] worker() still running because work_queue has size 9
[5885] processed work item 11
[5886] processed work item 12
[5885] worker() still running because work_queue has size 7
[5886] worker() still running because work_queue has size 7
[5885] processed work item 13
[5886] processed work item 14
[5885] worker() still running because work_queue has size 5
[5886] worker() still running because work_queue has size 5
[5885] processed work item 15
[5886] processed work item 16
[5885] worker() still running because work_queue has size 3
[5886] worker() still running because work_queue has size 3
[5885] processed work item 17
[5886] processed work item 18
[5885] worker() still running because work_queue has size 1
[5886] worker() still running because work_queue has size 1
[5885] processed work item 19
[5885] worker() still running because work_queue has size 0
[5885] worker() is finished; returning results of size 20
(。程序掛起的最後一行的其他工藝 - 5886 - 似乎並沒有完成)
import multiprocessing
from multiprocessing import Pool
import os
# Python 2.7.6 on Linux
# Worker (consumer) process
def worker(work_queue, results_queue):
print "[%d] entering worker() with work_queue size of %d" % (os.getpid(), work_queue.qsize())
while not work_queue.empty():
item = work_queue.get()
print "[%d] processed work item %s" % (os.getpid(), item)
s = '[%d] has processed %s.' % (os.getpid(), item)
results_queue.put(s)
work_queue.task_done()
print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize())
print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize())
if __name__ == '__main__':
MAX_PROCESS = 2 # Max number of processes to create
MAX_ITEMS = 20 # Max work items to process
m = multiprocessing.Manager()
results_queue = m.Queue()
work_queue = m.Queue()
# Assign work
for x in xrange(MAX_ITEMS):
work_queue.put(x)
print "Found %d CPUs." % multiprocessing.cpu_count()
print "Operation queue has %d items." % work_queue.qsize()
print "Will start %d processes." % MAX_PROCESS
# Pool method
pool = Pool(processes=MAX_PROCESS)
for n in range(0, MAX_PROCESS):
pool.apply_async(worker, args=(work_queue, results_queue))
pool.close()
print "Joining pool..."
pool.join()
print "Joining pool finished..."
print "--- After pool completion ---"
print "Work queue has %d items." % work_queue.qsize()
print "Results queue has %d items." % results_queue.qsize()
print "Results are:"
while not results_queue.empty():
item = results_queue.get()
print str(item)
results_queue.task_done()
print "--End---"
感謝您的幫助。