我試圖在Python 2.7中使用 Queue.Queue實現多線程生產者 - 消費者模式。我試圖弄清楚如何讓 使用者(即工作者線程)在完成所有必需的工作後停止。如何在多線程生產者 - 使用者模式下完成工作線程後退出工作線程?
見馬丁詹姆斯第二評論這個答案:https://stackoverflow.com/a/19369877/1175080
發送「我完了」的任務,指示池中的線程終止。任何得到這樣的任務的線程都會重新執行它,然後自殺。
但這不適用於我。例如,請參閱以下代碼。
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
該程序掛起所有三個工人已經閱讀出口指示燈, 即-1
從隊列後,因爲每個工人重新排隊-1
前 退出,所以隊列永遠不會成爲空和q.join()
永遠不會返回。
我想出了以下但醜陋的解決方案,我通過隊列發送-1
出口 指標爲每名工人,使每個工人可以看到它 和自殺。但是,我必須爲每個工作人員發送退出指示 ,這一事實感覺有點難看。
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
我有兩個問題。
- 是否可以爲所有線程發送單個退出指示符的方法(如Martin James的https://stackoverflow.com/a/19369877/1175080的第二條評論所述)甚至可以工作?
- 如果上一個問題的答案是「否」,是否有辦法解決問題,我不必爲每個工作線程發送單獨的退出指示符?
發送每個工作看起來像一個很好的解決方案爲我的殺人信號,我不會說是如此醜陋。您也可以加入線程而不是加入隊列 – Netwave
請注意,有一個['ThreadPool'](https://stackoverflow.com/a/3386632/3767239)類可用,它負責「手動」分配任務在多個線程之間。你可以'加入'這樣的池(而不是隊列),然後發送*「停止」*信號將最終終止所有線程。其實我不明白你爲什麼要「加入」隊列而不是線程。使用Python 3,你可以通過[concurrent](https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor)模塊獲得更多的功能(並且更好的記錄)。 –
一些額外的評論。從你的示例代碼中不清楚爲什麼你會首先使用這樣一個*「stop」*命令(你可以把這個部分放出來,等待隊列加入)。然後 - 如果你使用了這樣的命令 - 不能保證每個線程都會「正常」關閉:'q.join()'可能會在所有線程收到'-1'之前恢復,因爲你調用了'q'。在重新將'-1'加入到隊列之前task_done()'(這意味着在重新放置'-1'之前任務計數可以達到零(這增加了計數)並且因此'q.join()'可以恢復)。 –