2016-07-20 36 views
1

我想實現一個Queue處理線程在python如下:。加入可能的空隊列在python

from queue import Queue 
from threading import Thread 
import sys  

class Worker(Thread): 
    def __init__(self, queue): 
     # Call thread constructor 
     self.queue = queue 

    def run(self): 
     while True: 
      task = self.queue.get() 
      # doTask() 
      self.queue.task_done() 

queue = Queue() 
thread = Worker(thread) 
thread.start() 

while True: 
    inp = user_input() 

    if condition(inp): 
     queue.put(sometask()) 
    else: 
     queue.join() 
     thread.join() 
     sys.exit(0) 

在這個例子中,假設用戶決定exit不添加到隊列中的任何項目。然後我的線程將被阻止在self.queue.get和我queue.join()將無法​​正常工作。因此,我無法執行正確的exit

我該如何處理這個問題?

回答

1

你可以給Queue.get超時和使用停止事件:

from Queue import Queue, Empty 
from threading import Thread, Event 
import sys 

class Worker(Thread): 
    def __init__(self, queue, stop): 
     # Call thread constructor 
     self.queue = queue 
     self.stop = stop 
     super(Worker, self).__init__() 

    def run(self): 
     while not self.stop.is_set(): 
      try: 
       task = self.queue.get(timeout=1) 
      except Empty: 
       continue 
      # doTask() 
      self.queue.task_done() 

queue = Queue() 
stop = Event() 
thread = Worker(queue, stop) 
thread.start() 

while True: 
    inp = raw_input() 

    if inp: 
     queue.put(inp) 
    else: 
     stop.set() 
     queue.join() 
     thread.join() 
     sys.exit(0) 

這增加了一個條件線程工人的while循環,這樣就可以阻止它時。您必須給Queue.get超時,以便它可以定期檢查停止事件。

更新

您可以使用一個哨兵,而不是超時:

from Queue import Queue 
from threading import Thread 
import sys 

_sentinel = Object() 

class Worker(Thread): 
    def __init__(self, queue, sentinel=None): 
     # Call thread constructor 
     self.queue = queue 
     self.sentinel = sentinel 
     super(Worker, self).__init__() 

    def run(self): 
     while True: 
      task = self.queue.get() 
      if task is self.sentinel: 
       self.queue.task_done() 
       return 
      # doTask() 
      self.queue.task_done() 


queue = Queue() 
thread = Worker(queue, sentinel=_sentinel) 
thread.start() 

while True: 
    inp = raw_input() 

    if inp: 
     queue.put(inp) 
    else: 
     queue.put(_sentinel) 
     queue.join() 
     thread.join() 
     sys.exit(0) 

感謝Bakuriu爲定點=對象()的建議。

+0

這是一個好主意。但是,我將不得不在每個1s迭代循環。我可以觸發一些可以動態解鎖'queue.get'的事件嗎? – niyasc

+0

@niyasc你指的是哪一個循環?線程工作循環或將項目放入隊列的循環?爲什麼你需要迭代它每1秒? – FamousJameous

+0

我指的是'Worker'類中的循環。按照這個實現,'get'方法將在'1s'中超時,並且它將用於下一次迭代。不是嗎? – niyasc