2014-10-04 280 views
1

當使用multiprocessing.JoinableQueue產生進程的線程時,我得到BrokenPipeError。這似乎發生在程序完成工作並嘗試退出之後,因爲它確實是應該做的。這是什麼意思,有沒有辦法解決這個/安全忽略?Python多線程+多處理BrokenPipeError(子進程不退出?)

import requests 
import multiprocessing 
from multiprocessing import JoinableQueue 
from queue import Queue 
import threading 


class ProcessClass(multiprocessing.Process): 
    def __init__(self, func, in_queue, out_queue): 
     super().__init__() 
     self.in_queue = in_queue 
     self.out_queue = out_queue 
     self.func = func 

    def run(self): 
     while True: 
      arg = self.in_queue.get() 
      self.func(arg, self.out_queue) 
      self.in_queue.task_done() 


class ThreadClass(threading.Thread): 
    def __init__(self, func, in_queue, out_queue): 
     super().__init__() 
     self.in_queue = in_queue 
     self.out_queue = out_queue 
     self.func = func 

    def run(self): 
     while True: 
      arg = self.in_queue.get() 
      self.func(arg, self.out_queue) 
      self.in_queue.task_done() 


def get_urls(host, out_queue): 
    r = requests.get(host) 
    out_queue.put(r.text) 
    print(r.status_code, host) 


def get_title(text, out_queue): 
    print(text.strip('\r\n ')[:5]) 


if __name__ == '__main__': 
    def test(): 

     q1 = JoinableQueue() 
     q2 = JoinableQueue() 

     for i in range(2): 
      t = ThreadClass(get_urls, q1, q2) 
      t.daemon = True 
      t.setDaemon(True) 
      t.start() 

     for i in range(2): 
      t = ProcessClass(get_title, q2, None) 
      t.daemon = True 
      t.start() 

     for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",): 
      q1.put(host) 

     q1.join() 
     q2.join() 

    test() 
    print('Finished') 

程序輸出:

200 http://ibm.com 
<!DOC 
200 http://google.com 
<!doc 
200 http://yahoo.com 
<!DOC 
200 http://apple.com 
<!DOC 
200 http://amazon.com 
<!DOC 
Finished 
Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "C:\Python\33\lib\multiprocessing\connection.py", line 313, in _recv_bytes 
    nread, err = ov.GetOverlappedResult(True) 
BrokenPipeError: [WinError 109] 

The pipe has been ended 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "C:\Python\33\lib\threading.py", line 901, in _bootstrap_inner 
    self.run() 
    File "D:\Progs\Uspat\uspat\spider\run\threads_test.py", line 31, in run 
    arg = self.in_queue.get() 
    File "C:\Python\33\lib\multiprocessing\queues.py", line 94, in get 
    res = self._recv() 
    File "C:\Python\33\lib\multiprocessing\connection.py", line 251, in recv 
    buf = self._recv_bytes() 
    File "C:\Python\33\lib\multiprocessing\connection.py", line 322, in _recv_bytes 
    raise EOFError 
EOFError 
.... 

(切成同樣的錯誤其他線程。)

如果我切換到JoinableQueue爲queue.Queue多線程的一部分,一切修復,但爲什麼呢?

回答

2

發生這種情況,因爲你要離開了後臺線程在multiprocessing.Queue.get調用主線程退出時阻塞,但它只能在一定的條件下發生的:

  1. 守護線程運行和阻塞上multiprocessing.Queue.get當主線程退出時。
  2. A multiprocessing.Process正在運行。
  3. multiprocessing上下文不是'fork'

唯一的例外是告訴你的是,Connection的另一端,該multiprocessing.JoinableQueue是聽時,其一個get()調用內發送的EOF。通常這意味着Connection的另一端已關閉。在關閉期間發生這種情況是有道理的 - 在退出解釋器之前,Python正在清理所有對象,部分清理操作包括關閉所有打開的Connection對象。我還沒有弄清楚的是,爲什麼只有(並且總是)如果產生了multiprocessing.Process而不是分叉,這就是爲什麼默認情況下它不會在Linux上發生)並且仍在運行。我甚至可以重現它,如果我創建一個multiprocessing.Process,只是睡在while循環。它根本不需要任何Queue對象。無論出於何種原因,運行中產生的子進程似乎都能保證異常會被提出。它可能只是簡單地導致事情被破壞的順序恰好適合競爭條件的發生,但這是一種猜測。

在任何情況下,使用的queue.Queue,而不是multiprocessing.JoinableQueue是修復的好方法,因爲你實際上並不需要multiprocessing.Queue那裏。您還可以通過發送標記到隊列中來確保後臺線程和/或後臺進程在主線程之前關閉。因此,使雙方run方法檢查前哨:

def run(self): 
    for arg in iter(self.in_queue.get, None): # None is the sentinel 
     self.func(arg, self.out_queue) 
     self.in_queue.task_done() 
    self.in_queue.task_done() 

,然後發送哨兵當你做:

threads = [] 
    for i in range(2): 
     t = ThreadClass(get_urls, q1, q2) 
     t.daemon = True 
     t.setDaemon(True) 
     t.start() 
     threads.append(t) 

    p = multiprocessing.Process(target=blah) 
    p.daemon = True 
    p.start() 
    procs = [] 
    for i in range(2): 
     t = ProcessClass(get_title, q2, None) 
     t.daemon = True 
     t.start() 
     procs.append(t) 

    for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",): 
     q1.put(host) 

    q1.join() 
    # All items have been consumed from input queue, lets start shutting down. 
    for t in procs: 
     q2.put(None) 
     t.join() 
    for t in threads: 
     q1.put(None) 
     t.join() 
    q2.join() 
+0

謝謝你一個完整的答案(一個側面說明,或許有幫助的人:我想使用multiprocessing.JoinableQueue而不是queue.Queue也能夠將來自多處理部分的參數傳遞迴應用程序的多線程部分,儘管在上例中沒有這樣的代碼) – Bob 2014-10-04 06:35:56