2017-07-18 117 views
2

我實際上很難相信我遇到了問題,我有,它似乎是一個在Python多處理模塊中的大錯誤。無論如何,我遇到的問題是,只要我將multiprocessing.Queue傳遞給multiprocessing.Pool worker作爲參數,池工作人員就不會執行其代碼。即使是在python docs中找到的示例代碼的稍微修改版本的非常簡單的測試中,我也能夠重現此錯誤。multiprocessing.Queue作爲參數池的工人中止工作人員的執行

下面是示例代碼隊列的原始版本:

from multiprocessing import Process, Queue 

def f(q): 
    q.put([42, None, 'hello']) 


if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print(q.get()) # prints "[42, None, 'hello']" 
    p.join() 

這裏是我的隊列的示例代碼修改後的版本:

from multiprocessing import Queue, Pool 

def f(q): 
    q.put([42, None, 'hello']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Pool(1) 
    p.apply_async(f,args=(q,)) 
    print(q.get()) # prints "[42, None, 'hello']" 
    p.close() 
    p.join() 

所有我做的是使PA大小爲1的進程池,而不是多進程。進程對象,結果是代碼永遠掛在打印語句上,因爲沒有任何內容寫入隊列!當然,我測試它的原始形式,它工作正常。我的操作系統是Windows 10,我的Python版本是3.5.x,任何人都有任何想法,爲什麼發生這種情況?

更新:仍然不知道爲什麼這個示例代碼與multiprocessing.process,而不是一個multiprocessing.Pool,但我發現work around我滿足(亞歷克斯Martelli的答案)。顯然你可以創建一個多處理的全局列表。並且要傳遞每個進程和索引來使用,我將避免使用一個受管隊列,因爲它們比較慢。感謝Guest向我展示鏈接。

+1

你可能想看看[#1](https://stackoverflow.com/a/30039159/3767239),[# 2](https://stackoverflow.com/q/3217002/3767239),[#3](https://stackoverflow.com/q/9908781/3767239),[#4](https://stackoverflow.com/a/42659752/3767239),[#5](https://stackoverflow.com/a/25558333/3767239)。看起來原因是'Queue'實例不能被醃製。但是我不明白爲什麼這會使流程的底層隊列陷入僵局。使用池大小爲2的'Ctrl + C'顯示它被卡在'task = inqueue.get()'它將請求目標函數的地方。這有點令人費解。 –

+0

請注意,對於異步編程,您不需要手動處理結果隊列 - apply_async會返回['AsyncResult'](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool .AsyncResult)可以用來得到結果的實例:'result.get()'。這使用了一個基礎結果(out-)隊列,所以你只需要在你的目標函數中「返回」。同樣,如果你使用'result.get()',並且你將'Queue'實例作爲參數傳遞給目標函數,它將引發'RuntimeError'。不過,我很好奇爲什麼這不會發生在你的例子。 –

+0

看到我的評論給你的答案。我的目標不是「結果隊列」,這僅僅是一個簡單的例子。我需要一個連續寫入和處理的隊列。 – profPlum

回答

0

問題

當你調用apply_async它返回一個AsyncResult對象和葉工作量分配到一個單獨的線程(見this answer)。此線程遇到Queue對象不能爲pickled的問題,因此所請求的工作不能分發(並最終執行)。我們可以通過調用AsyncResult.get看到這一點:

r = p.apply_async(f,args=(q,)) 
r.get() 

這引起了RuntimeError

RuntimeError: Queue objects should only be shared between processes through inheritance 

然而,這RuntimeError是在主線程中只是提出一旦申請結果,因爲它實際上是發生在不同的線程(並因此需要傳輸的方式)。

當你做

p.apply_async(f,args=(q,)) 

是目標函數f永遠不會被調用,因爲它的一個參數(q)不能醃製會發生什麼。因此q永遠不會收到一個項目並保持爲空,因此在主線程中調用q.get將永遠阻止。

解決方案

隨着apply_async您不必手動管理的結果隊列,但他們很容易在AsyncResult對象的形式提供給您。所以,你可以修改代碼來簡單地從目標函數返回:

from multiprocessing import Queue, Pool 

def f(): 
    return [42, None, 'hello'] 

if __name__ == '__main__': 
    q = Queue() 
    p = Pool(1) 
    result = p.apply_async(f) 
    print(result.get()) 
+0

有趣但我不明白代碼是如何工作的,當你只使用multiprocessing.Process而不是multiprocessing.Pool時,他們都創建新的流程,所以不需要爲兩種方法對Queue進行pickle?另外使用AsyncResult解決方法對我來說不是真正可行的,因爲我需要一堆工作進程持續寫入隊列,然後由另一個工作進程讀取並處理該隊列。 – profPlum

+0

@profPlum對於這些問題,我想參考[這個答案](https://stackoverflow.com/a/45184127/3767239)。其實質是'Pool'在收到'Queue'實例之後立即啓動進程,而'Process'開始啓動(此處不需要酸洗,進程尚未運行)。您可以使用['Manager'](https://docs.python.org/3/library/multiprocessing.html#managers)在進程之間共享對象,或者使用[''initializer''和'initargs'關鍵字參數[ Pool'](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool)。 –

+0

啊確定不確定爲什麼這個過程已經運行的事實需要酸洗,但有關於初始化和初始化的解釋和信息我會接受你的回答 – profPlum