來自python的隊列對多線程來說是實用的,但是它不支持在隊列爲空時無限期地停止工作線程。是python的隊列不完整還是我的設計有缺陷
例如,考慮一下:
queue = Queue()
def process(payload):
time.sleep(random())
def work(item):
while(True):
payload = queue.get()
try:
process(payload)
except:
print("TERROR ERROR!")
finally:
queue.task_done()
threads = dict()
for thread_id in range(10):
threads[thread_id] = Thread(target=work)
threads[thread_id].deamon = True
threads[thread_id].start()
for payload in range(100):
queue.put(payload)
queue.join();
所以這偉大的工程,但不是真的。 queue.join()等待所有要報告的項目完成,然後主線程完成,但工作線程將無限期地等待。如果這將是(unix)進程的結束,當然,我們可以將它留給操作系統,但如果它繼續下去,會有這些等待線程溢出資源。
然後我們實行定點,EOQ,或底部或任何你想調用它:
class Sentinel:
def __init__(self):
pass
def work(item):
while(True):
payload = queue.get()
if type(payload) == Sentinel:
queue.task_done()
break
# ...
threads = dict()
# ...
for thread_id in threads:
queue.put(Sentinel())
queue.join();
這是一個更好的解決方案,因爲線程停下來。然而,注射哨兵的代碼笨拙,容易出錯。考慮一下,我不小心把它放在那裏,或者一個工作線程意外地處理了兩個線程,這樣其他的工作線程就不會得到他們的線程。
或者:
class FiniteQueue(Queue):
def __init__(self, ....)
super() .__init__(....)
self.finished = False
def put(self, item, ...):
if self.finished:
raise AlreadyFinished()
super().put(item, ...)
def set_finished(self):
self.finished=True
def get(self, ...):
if self.finished:
raise AlreadyFinished()
return super().get(....)
很顯然,我是個懶人,並沒有使put()方法是線程安全的,不過這是非常有可能的事情。這樣工作人員可以簡單地捕獲AlreadyFinished對象,然後停下來。
當所有有效載荷輸入時,主隊列可以簡單地應用set_finished()。然後,隊列可以檢測何時不會獲得更多有效載荷,並將其報告給工作人員(或者如果您願意的話)。
爲什麼python隊列不提供set_finished()功能?它不會干擾endless_queue用例,但支持有限的處理流水線。
我錯過了這個設計中的一個明顯的錯誤?這是不應該要的東西嗎?是否有更簡單的替代方案提供FiniteQueue?
你想要的是通常被稱爲易破隊列。這不是不合理的,但它不是標準隊列中常見的功能。 (例如,在引入標準隊列後,Perl增加了十多年的可破壞性。)人們使用標記,繁忙的輪詢循環等來實現相同的效果。設計程序也是適當的,因此消費者的關機是無關緊要的。事實上,python(自2.5開始)通過非標準的隊列擴展'task_done'和'join'使這很容易 - 現在你的製作者可以確保所有的工作都完成了。 – pilcrow