當使用multiprocessing.JoinableQueue產生進程的線程時,我得到BrokenPipeError。這似乎發生在程序完成工作並嘗試退出之後,因爲它確實是應該做的。這是什麼意思,有沒有辦法解決這個/安全忽略?Python多線程+多處理BrokenPipeError(子進程不退出?)
import requests
import multiprocessing
from multiprocessing import JoinableQueue
from queue import Queue
import threading
class ProcessClass(multiprocessing.Process):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.in_queue = in_queue
self.out_queue = out_queue
self.func = func
def run(self):
while True:
arg = self.in_queue.get()
self.func(arg, self.out_queue)
self.in_queue.task_done()
class ThreadClass(threading.Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.in_queue = in_queue
self.out_queue = out_queue
self.func = func
def run(self):
while True:
arg = self.in_queue.get()
self.func(arg, self.out_queue)
self.in_queue.task_done()
def get_urls(host, out_queue):
r = requests.get(host)
out_queue.put(r.text)
print(r.status_code, host)
def get_title(text, out_queue):
print(text.strip('\r\n ')[:5])
if __name__ == '__main__':
def test():
q1 = JoinableQueue()
q2 = JoinableQueue()
for i in range(2):
t = ThreadClass(get_urls, q1, q2)
t.daemon = True
t.setDaemon(True)
t.start()
for i in range(2):
t = ProcessClass(get_title, q2, None)
t.daemon = True
t.start()
for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
q1.put(host)
q1.join()
q2.join()
test()
print('Finished')
程序輸出:
200 http://ibm.com
<!DOC
200 http://google.com
<!doc
200 http://yahoo.com
<!DOC
200 http://apple.com
<!DOC
200 http://amazon.com
<!DOC
Finished
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Python\33\lib\multiprocessing\connection.py", line 313, in _recv_bytes
nread, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109]
The pipe has been ended
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Python\33\lib\threading.py", line 901, in _bootstrap_inner
self.run()
File "D:\Progs\Uspat\uspat\spider\run\threads_test.py", line 31, in run
arg = self.in_queue.get()
File "C:\Python\33\lib\multiprocessing\queues.py", line 94, in get
res = self._recv()
File "C:\Python\33\lib\multiprocessing\connection.py", line 251, in recv
buf = self._recv_bytes()
File "C:\Python\33\lib\multiprocessing\connection.py", line 322, in _recv_bytes
raise EOFError
EOFError
....
(切成同樣的錯誤其他線程。)
如果我切換到JoinableQueue爲queue.Queue多線程的一部分,一切修復,但爲什麼呢?
謝謝你一個完整的答案(一個側面說明,或許有幫助的人:我想使用multiprocessing.JoinableQueue而不是queue.Queue也能夠將來自多處理部分的參數傳遞迴應用程序的多線程部分,儘管在上例中沒有這樣的代碼) – Bob 2014-10-04 06:35:56