2012-07-02 168 views
7

我需要知道Queue何時關閉,並且不會有更多項目,因此我可以結束迭代。Python迭代隊列

我做到了,通過把一個哨兵在隊列:

from Queue import Queue 

class IterableQueue(Queue): 

    _sentinel = object() 

    def __iter__(self): 
     return self 

    def close(self): 
     self.put(self._sentinel) 

    def next(self): 
     item = self.get() 
     if item is self._sentinel: 
      raise StopIteration 
     else: 
      return item 

考慮到這是一個隊列中的使用非常普遍,是不是有什麼內置執行?

+0

我既可以使用定點或標誌線內停止在隊列中的迭代。對於以後,我通常會等待一段時間。 – jdi

回答

10

前哨爲製片人發送一條消息,沒有更多的隊列中的任務即將到來的合理方法。

FWIW,你的代碼可以簡化與iter()兩個參數的形式頗有幾分:

from Queue import Queue 

class IterableQueue(Queue): 

    _sentinel = object() 

    def __iter__(self): 
     return iter(self.get, self._sentinel) 

    def close(self): 
     self.put(self._sentinel) 
4

多處理模塊具有其自己的版本Queue,其確實包括close方法。我不確定它在線程中是如何工作的,但它值得一試。我不明白爲什麼它不應該工作相同:

from multiprocessing import Queue 

q = Queue() 
q.put(1) 
q.get_nowait() 
# 1 
q.close() 
q.get_nowait() 
# ... 
# IOError: handle out of range in select() 

您可以只捕獲IOError作爲關閉信號。

TEST

from multiprocessing import Queue 
from threading import Thread 

def worker(q): 
    while True: 
     try: 
      item = q.get(timeout=.5) 
     except IOError: 
      print "Queue closed. Exiting thread." 
      return 
     except: 
      continue 
     print "Got item:", item 

q = Queue() 
for i in xrange(3): 
    q.put(i) 
t = Thread(target=worker, args=(q,)) 
t.start() 
# Got item: 0 
# Got item: 1 
# Got item: 2 
q.close() 
# Queue closed. Exiting thread. 

雖然說實話,它沒有太多的比對Queue.Queue設置一個標誌不同。該multiprocessing.Queue只是使用一個封閉的文件描述符作爲一個標誌:

from Queue import Queue 

def worker2(q): 
    while True: 
     if q.closed: 
      print "Queue closed. Exiting thread." 
      return 
     try: 
      item = q.get(timeout=.5) 
     except: 
      continue 
     print "Got item:", item 

q = Queue() 
q.closed = False 
for i in xrange(3): 
    q.put(i) 
t = Thread(target=worker2, args=(q,)) 
t.start() 
# Got item: 0 
# Got item: 1 
# Got item: 2 
q.closed = True 
# Queue closed. Exiting thread.