2016-04-01 55 views
2

在python2.7中,multiprocessing.Queue從函數內部初始化時會拋出一個錯誤的錯誤。我提供了一個重現問題的最小例子。多處理損壞的管道錯誤.Queue

#!/usr/bin/python 
# -*- coding: utf-8 -*- 

import multiprocessing 

def main(): 
    q = multiprocessing.Queue() 
    for i in range(10): 
     q.put(i) 

if __name__ == "__main__": 
    main() 

拋出下面的破管錯誤

Traceback (most recent call last): 
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed 
send(obj) 
IOError: [Errno 32] Broken pipe 

Process finished with exit code 0 

我無法破譯原因。如果我們不能從函數內部填充Queue對象,那肯定會很奇怪。

回答

3

這裏發生的是當您撥打main()時,它創建Queue,放入10個對象並結束該函數,垃圾收集其所有內部變量和對象,包括Queue。 但是你得到這個錯誤,因爲你仍然試圖發送Queue中的最後一個數字。

「當一個進程首先將在隊列中的進料器線程 開始從緩衝器到所述管傳送對象項目」:

從文檔 documentation

作爲put()在另一個線程中取得,但並不阻止腳本的執行,並允許在完成隊列操作之前結束main()功能。

試試這個:

#!/usr/bin/python 
# -*- coding: utf-8 -*- 

import multiprocessing 
import time 
def main(): 
    q = multiprocessing.Queue() 
    for i in range(10): 
     print i 
     q.put(i) 
    time.sleep(0.1) # Just enough to let the Queue finish 

if __name__ == "__main__": 
    main() 

應該有辦法join隊列或塊執行,直到目標被放在Queue,你應該採取文檔中看看。

+0

精彩的回答。我要給另一個說明,在python3中這不會發生。 – hAcKnRoCk

0

延遲使用time.sleep(0.1)按照@HarryPotFleur的建議,解決了這個問題。但是,我用python3測試了代碼,並且在python3中根本沒有發生管道問題。我認爲這被報告爲一個錯誤,後來得到修復。

+0

這是**不正確**,它不會發生在python3中。還有什麼更多 'time.sleep(0.1)'沒有解決!這只是爲了理解! –

1

當啓動Queue.put()時,啓動隱式線程將數據傳遞到隊列。同時,主應用程序結束,數據沒有結束站(隊列對象被垃圾收集)。

我想試試這個:

from multiprocessing import Queue 

def main(): 
    q = Queue() 
    for i in range(10): 
     print i 
     q.put(i) 
    q.close() 
    q.join_thread() 

if __name__ == "__main__": 
    main() 

join_thread()保證,緩衝區中的所有數據已被刷新。 close()必須先撥打join_thread()

相關問題