2014-10-16 69 views
2
import multiprocessing.queues as queues 
import multiprocessing 
class I(queues.Queue): 
    def __init__(self, maxsize=0): 
     super(I, self).__init__(maxsize) 
     self.length = 0 

    def __iter__(self): 
     return self 

    def put(self, obj, block=True, timeout=None): 
     super(I, self).put(obj,block,timeout) 
     self.length += 1 

    def get(self, block = True, timeout = None): 
     self.length -= 1 
     return super(I, self).get(block, timeout) 

    def __len__(self): 
     return self.length 

    def next(self): 
     item = self.get() 
     if item == 'Done': 
      raise StopIteration 
     return item 


def thisworker(item): 
    print 'got this item: %s' % item 
    return item 

q=I() 

q.put(1) 
q.put('Done') 

the_pool = multiprocessing.Pool(1) 
print the_pool.map(thisworker, q) 

我想創建一個可用於多處理池映射的可迭代隊列。 的想法是,該功能thisworker會追加一些資料轉移到隊列中,直到一個條件得到滿足,然後退出將「完成」隊列後(我不是在這裏做了這個代碼還)Iterable multiprocessing Queue not exiting

但是,此代碼從未完成,它總是掛斷。

我無法調試真正的原因。 請求您的幫助

PS:我用self.length因爲map_async方法從the_pool.map下名爲需要使用迭代的長度,而形成一個變量:chunksize,這將被用來從池中獲取任務。

回答

1

問題是,您將'Done'視爲Queue中的一個特例項目,這表示迭代應該停止。因此,如果您在示例中使用for循環遍歷Queue,則返回的所有內容爲1。但是,您聲稱Queue的長度爲2.這是搞砸了map代碼,該代碼依賴於該長度來準確表示迭代中的項目數,以便知道何時從所有結果返回工作人員:

class MapResult(ApplyResult): 

    def __init__(self, cache, chunksize, length, callback): 
     ApplyResult.__init__(self, cache, callback) 
     ... 
     # _number_left is used to know when the MapResult is done 
     self._number_left = length//chunksize + bool(length % chunksize) 

所以,你需要使長度實際上是準確的。你可以做到這一點的幾種方法,但我會建議不要求哨兵被加載到Queue所有,並使用get_nowait代替:

import multiprocessing.queues as queues 
import multiprocessing 
from Queue import Empty 

class I(queues.Queue): 
    def __init__(self, maxsize=0): 
     super(I, self).__init__(maxsize) 
     self.length = 0 

    ... <snip> 

    def next(self): 
     try: 
      item = self.get_nowait() 
     except Empty: 
      raise StopIteration 
     return item 


def thisworker(item): 
    print 'got this item: %s' % item 
    return item 

q=I() 

q.put(1) 

the_pool = multiprocessing.Pool(1) 
print the_pool.map(thisworker, q) 

此外,請注意,這種做法是不處理的安全。 length屬性僅在單個進程中只有put進入Queue後纔會正確,然後在將Queue發送到工作進程後再次不會執行put。如果不調整導入和實現,它也不能在Python 3中工作,因爲multiprocessing.queues.Queue的構造函數已更改。

相反子類multiprocessing.queues.Queue的,我會建議使用iter內置遍歷Queue

q = multiprocessing.Queue() 
q.put(1) 
q.put(2) 
q.put(None) # None is our sentinel, you could use 'Done', if you wanted 
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned 

這將會對Python的所有版本的,要少得多的代碼,並且是過程安全。

編輯:

根據你在我的答案評論中提及的要求,我想你最好使用imap代替map,這樣你就不需要知道的長度根本就不是Queue。現實情況是,你無法準確確定,實際上隨着迭代的進行,長度可能會增加。如果你使用imap獨佔,然後做類似的東西,你原來的做法將正常工作:

import multiprocessing 

class I(object): 
    def __init__(self, maxsize=0): 
     self.q = multiprocessing.Queue(maxsize) 

    def __getattr__(self, attr): 
     if hasattr(self.q, attr): 
      return getattr(self.q, attr) 

    def __iter__(self): 
     return self 

    def next(self): 
     item = self.q.get() 
     if item == 'Done': 
      raise StopIteration 
     return item 


def thisworker(item): 
    if item == 1: 
     q.put(3) 
    if item == 2: 
     q.put('Done') 
    print 'got this item: %s' % item 
    return item 

q=I() 

q.put(1) 
q.put(2) 
q.put(5) 

the_pool = multiprocessing.Pool(2) # 2 workers 
print list(the_pool.imap(thisworker, q)) 

輸出:

got this item: 1 
got this item: 5 
got this item: 3 
got this item: 2 
[1, 2, 5, 3] 

我擺脫了那個擔心長度的代碼,以及用於委託代替的繼承,以獲得更好的Python 3.x兼容性。

請注意,只要您使用imap而不是map,我的原始建議(使用iter(q.get, <sentinel>))仍然可以在此使用。

+0

非常好的解釋。但是,我對你的回答有進一步的懷疑。正如你在我的問題中可能已經注意到的那樣,這個方法的功能需要繼續追加到隊列中,直到滿足特定的條件。而且,還會有x個這樣的'thisworker'進程在運行。您的解決方案在這種情況下是否也適用? – GodMan 2014-10-16 14:36:48

+0

@GodMan因此,'thisworker'除了消耗它之外,還會在'Queue'中添加項目? – dano 2014-10-16 14:49:19

+0

@GodMan此外,'thisworker'只有一個實例需要在'Queue'中放入''Done'',以便它們全部停止工作? – dano 2014-10-16 14:56:52

相關問題