問題是,您將'Done'
視爲Queue
中的一個特例項目,這表示迭代應該停止。因此,如果您在示例中使用for循環遍歷Queue
,則返回的所有內容爲1
。但是,您聲稱Queue
的長度爲2.這是搞砸了map
代碼,該代碼依賴於該長度來準確表示迭代中的項目數,以便知道何時從所有結果返回工作人員:
class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
...
# _number_left is used to know when the MapResult is done
self._number_left = length//chunksize + bool(length % chunksize)
所以,你需要使長度實際上是準確的。你可以做到這一點的幾種方法,但我會建議不要求哨兵被加載到Queue
所有,並使用get_nowait
代替:
import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
... <snip>
def next(self):
try:
item = self.get_nowait()
except Empty:
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
此外,請注意,這種做法是不處理的安全。 length
屬性僅在單個進程中只有put
進入Queue
後纔會正確,然後在將Queue
發送到工作進程後再次不會執行put
。如果不調整導入和實現,它也不能在Python 3中工作,因爲multiprocessing.queues.Queue
的構造函數已更改。
相反子類multiprocessing.queues.Queue
的,我會建議使用iter
內置遍歷Queue
:
q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None) # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned
這將會對Python的所有版本的,要少得多的代碼,並且是過程安全。
編輯:
根據你在我的答案評論中提及的要求,我想你最好使用imap
代替map
,這樣你就不需要知道的長度根本就不是Queue
。現實情況是,你無法準確確定,實際上隨着迭代的進行,長度可能會增加。如果你使用imap
獨佔,然後做類似的東西,你原來的做法將正常工作:
import multiprocessing
class I(object):
def __init__(self, maxsize=0):
self.q = multiprocessing.Queue(maxsize)
def __getattr__(self, attr):
if hasattr(self.q, attr):
return getattr(self.q, attr)
def __iter__(self):
return self
def next(self):
item = self.q.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
if item == 1:
q.put(3)
if item == 2:
q.put('Done')
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put(2)
q.put(5)
the_pool = multiprocessing.Pool(2) # 2 workers
print list(the_pool.imap(thisworker, q))
輸出:
got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]
我擺脫了那個擔心長度的代碼,以及用於委託代替的繼承,以獲得更好的Python 3.x兼容性。
請注意,只要您使用imap
而不是map
,我的原始建議(使用iter(q.get, <sentinel>)
)仍然可以在此使用。
非常好的解釋。但是,我對你的回答有進一步的懷疑。正如你在我的問題中可能已經注意到的那樣,這個方法的功能需要繼續追加到隊列中,直到滿足特定的條件。而且,還會有x個這樣的'thisworker'進程在運行。您的解決方案在這種情況下是否也適用? – GodMan 2014-10-16 14:36:48
@GodMan因此,'thisworker'除了消耗它之外,還會在'Queue'中添加項目? – dano 2014-10-16 14:49:19
@GodMan此外,'thisworker'只有一個實例需要在'Queue'中放入''Done'',以便它們全部停止工作? – dano 2014-10-16 14:56:52