2015-06-21 22 views
1

來自python的隊列對多線程來說是實用的,但是它不支持在隊列爲空時無限期地停止工作線程。是python的隊列不完整還是我的設計有缺陷

例如,考慮一下:

queue = Queue() 

def process(payload): 
    time.sleep(random()) 

def work(item): 
    while(True): 
    payload = queue.get() 
    try: 
     process(payload) 
    except: 
     print("TERROR ERROR!") 
    finally: 
     queue.task_done() 

threads = dict() 
for thread_id in range(10): 
    threads[thread_id] = Thread(target=work) 
    threads[thread_id].deamon = True 
    threads[thread_id].start() 

for payload in range(100): 
    queue.put(payload) 

queue.join(); 

所以這偉大的工程,但不是真的。 queue.join()等待所有要報告的項目完成,然後主線程完成,但工作線程將無限期地等待。如果這將是(unix)進程的結束,當然,我們可以將它留給操作系統,但如果它繼續下去,會有這些等待線程溢出資源。

然後我們實行定點,EOQ,或底部或任何你想調用它:

class Sentinel: 
    def __init__(self): 
    pass 

def work(item): 
    while(True): 
    payload = queue.get() 
    if type(payload) == Sentinel: 
     queue.task_done() 
     break 
    # ... 

threads = dict() 
# ... 

for thread_id in threads: 
    queue.put(Sentinel()) 
queue.join(); 

這是一個更好的解決方案,因爲線程停下來。然而,注射哨兵的代碼笨拙,容易出錯。考慮一下,我不小心把它放在那裏,或者一個工作線程意外地處理了兩個線程,這樣其他的工作線程就不會得到他們的線程。

或者:

class FiniteQueue(Queue): 
    def __init__(self, ....) 
    super() .__init__(....) 
    self.finished = False 

    def put(self, item, ...): 
    if self.finished: 
     raise AlreadyFinished() 
    super().put(item, ...) 

    def set_finished(self): 
    self.finished=True 

    def get(self, ...): 
    if self.finished: 
     raise AlreadyFinished() 
    return super().get(....) 

很顯然,我是個懶人,並沒有使put()方法是線程安全的,不過這是非常有可能的事情。這樣工作人員可以簡單地捕獲AlreadyFinished對象,然後停下來。

當所有有效載荷輸入時,主隊列可以簡單地應用set_finished()。然後,隊列可以檢測何時不會獲得更多有效載荷,並將其報告給工作人員(或者如果您願意的話)。

爲什麼python隊列不提供set_finished()功能?它不會干擾endless_queue用例,但支持有限的處理流水線。

我錯過了這個設計中的一個明顯的錯誤?這是不應該要的東西嗎?是否有更簡單的替代方案提供FiniteQueue?

+0

你想要的是通常被稱爲易破隊列。這不是不合理的,但它不是標準隊列中常見的功能。 (例如,在引入標準隊列後,Perl增加了十多年的可破壞性。)人們使用標記,繁忙的輪詢循環等來實現相同的效果。設計程序也是適當的,因此消費者的關機是無關緊要的。事實上,python(自2.5開始)通過非標準的隊列擴展'task_done'和'join'使這很容易 - 現在你的製作者可以確保所有的工作都完成了。 – pilcrow

回答

0

爲了解決哨兵問題,我把工號與哨兵號碼相同,因爲有工作線程。如果工作人員檢測到線程,則退出,因此不可能有重複。作爲哨兵,我使用了一個函數的引用 - 這是從未被調用的。

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 
# 
# test_queue.py 
# 
# Copyright 2015 John Coppens <[email protected]> 
# 
# This program is free software; you can redistribute it and/or modify 
# it under the terms of the GNU General Public License as published by 
# the Free Software Foundation; either version 2 of the License, or 
# (at your option) any later version. 
# 
# This program is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
# GNU General Public License for more details. 
# 
# You should have received a copy of the GNU General Public License 
# along with this program; if not, write to the Free Software 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
# MA 02110-1301, USA. 
# 
# 

from Queue import Queue 
from threading import Thread 
import time, random 

NR_WORKERS = 10 
queue = Queue() 

# The sentinel function below is bot necessary - you can use None as sentinel, 
# though this function might be useful if the worker wants to do something 
# after the last job (like wash his hands :) (Thanks Darren Ringer) 

# def sentinel(): 
# pass 

def process(payload): 
    time.sleep(random.random()) 
    print payload 

def work(): 
    while(True): 
     payload = queue.get() 
     print "Got from queue: ", payload 
     #if payload == sentinel: 
     if payload == None:   # See comment above 
      queue.task_done() 
      break 
     process(payload) 
     queue.task_done() 

threads = dict() 
for id in range(NR_WORKERS): 
    print "Creating worker ", id 
    threads[id] = Thread(target=work) 
    #threads[id].deamon = False 
    threads[id].start() 

for payload in range(100): 
    queue.put(payload) 

for stopper in range(NR_WORKERS): 
    # queue.put(sentinel)    # See comment at def sentinel() 
    queue.put(None) 

queue.join(); 

編輯:感謝@DarrenRinger建議使用無。我曾嘗試過,但它失敗(因爲另一個問題,我懷疑)

+1

更簡單,我已經看到None本身在很多代碼中用作定點值。 –

+0

@DarrenRinger你知道,我一開始沒有試過,但有一個問題 - 可能是由於其他問題。謝謝 - 我會將其添加到回覆中 - 當然,請參考。 – jcoppens

0

你的方法與哨兵對象是正確的,很好。 worker線程accidentally process twoSentinel對象是不可能的,因爲當它找到其中一個對象時,它的處理週期會中斷。

FiniteQueue方法不起作用,因爲設置finished標誌不會喚醒工人,在語句super().get(....)處被阻止。

這是大多數編程語言的常見問題,它支持線程:一次阻止兩個條件的等待。在你的情況,get()方法應該等待:

1)隊列變爲非空,

2)或終點標誌設置爲true

爲是正確的,等待的方法必須意識到兩者的條件。這使得使用易於支持wait的現成對象變得更加困難。有些語言支持某種線程中斷,它會喚醒被阻塞的線程。 Python似乎缺乏這樣的機制。

+0

工作人員有可能通過bug來處理,例如通過不打破外部循環而是打破一些內部循環。然而,你在FiniteQueue中是正確的,當在Queue本身中實現時,可以執行notify_all(假設Queue使用條件變量而不是互斥體)。喚醒意識到完成的標誌有什麼問題?即self.empty而不self.finished:self.cv.wait()。 – Herbert

+0

'喚醒瞭解已完成標誌的問題是什麼?' - 問題在於隊列的用途太多:例如,可能需要另一個用於終止put()的標誌等待。考慮到'finished'標誌需要'Queue'接口和實現的複雜性:get()應該返回特殊值或在'finished'標誌的情況下拋出異常。實際上,您可以實現您的隊列變體,並提供您所需的全部功能。這不是一項艱苦的工作。 – Tsyvarev

0

我建議使用Event對象而不是哨兵來避免混淆數據和哨兵。另外請確保你實施阻止queue.get()和超時不浪費資源。

from __future__ import print_function 
from threading import Thread, Event, current_thread 
from Queue import Queue import time 

queue = Queue() 
evt = Event() 

def process(payload): 
    time.sleep(1) 

def work(): 
    tid = current_thread().name 
    # try until signaled not to 
    while(not evt.is_set()): 
     try: 
     # block for max 1 second 
     payload = queue.get(True, 1) 
     process(payload) 
     except Exception as e: 
     print("%s thread exception %s" % (tid, e)) 
     else: 
     # calling task_done() in finally may cause too many calls 
     # resulting in an exception -- only call it once a task has actually been done 
     queue.task_done() 

threads = dict() 
for thread_id in range(10): 
    threads[thread_id] = Thread(target=work)     
    threads[thread_id].deamon = True 
    threads[thread_id].start() 
for payload in range(10): 
    queue.put(payload) 
queue.join() 
# all workers will end in approx 1 second at most 
evt.set() 
+1

隨着超時的get()簡單標誌就足夠了,事件就不需要了。 – Tsyvarev

相關問題