2012-06-24 60 views
0

我有python多線程隊列的問題。我有這個劇本,其中生產者採取從輸入隊列中的元素,會產生一些元素,將它們放置到輸出隊列中,消費者需要元素從輸出隊列,只是將它們打印:python隊列task_done()問題

import threading 
import Queue 

class Producer(threading.Thread): 
    def __init__(self, iq, oq): 
     threading.Thread.__init__(self) 
     self.iq = iq 
     self.oq = oq 

    def produce(self, e): 
     self.oq.put(e*2) 
     self.oq.task_done() 
     print "Producer %s produced %d and put it to output Queue"%(self.getName(), e*2) 

    def run(self): 
     while 1: 
      e = self.iq.get() 
      self.iq.task_done() 
      print "Get %d from input Queue"%(e) 
      self.produce(e) 


class Consumer(threading.Thread): 
    def __init__(self, oq): 
     threading.Thread.__init__(self) 
     self.oq = oq 

    def run(self): 
     while 1: 
      e = self.oq.get() 
      self.oq.task_done() 
      print "Consumer get %d from output queue and consumed"%e 

iq = Queue.Queue() 
oq = Queue.Queue() 

for i in xrange(2): 
    iq.put((i+1)*10) 

for i in xrange(2): 
    t1 = Producer(iq, oq) 
    t1.setDaemon(True) 
    t1.start() 

    t2 = Consumer(oq) 
    t2.setDaemon(True) 
    t2.start() 

iq.join() 
oq.join() 

但是,我每次運行它的時候,它工作不同(給予例外,或消費者不做任何工作)。我認爲問題出在task_done()命令中,任何人都可以向我解釋錯誤在哪裏?

我已經修改Consumer類:

class Consumer(threading.Thread): 
    def __init__(self, oq): 
     threading.Thread.__init__(self) 
     self.oq = oq 

    def run(self): 
     while 1: 
      e = self.oq.get() 
      self.oq.task_done() 
      print "Consumer get %d from output queue and consumed"%e 
      page = urllib2.urlopen("http://www.ifconfig.me/ip") 
      print page 

現在消費後各task_done()命令應連接到網站(它需要一些時間),但它沒有,task_done後的代碼,而不是如果執行時間()很小,它會運行,但如果它很長,它不會運行!爲什麼?任何人都可以解釋我這個問題?如果我把task放在task_done()之前,那麼我會阻塞來自其他線程的隊列,這足夠愚蠢。或者有什麼我缺少關於python中的多線程?

+0

請參閱我的答案中編輯您的其他問題。 – Amr

回答

3

Queuedocs

Queue.task_done()表明,以前排隊的任務就完成了。 由隊列消費者線程使用。對於每個用於獲取任務的get(),後續調用task_done()會告知隊列 上的處理任務已完成。

如果一個連接()當前正在阻止的時候,所有的項目都 處理完畢後,將恢復(這意味着一個task_done()調用收到了已被把每 項目()插入隊列)

例如,在你的代碼你將在你的Producer類如下:

def produce(self, e): 
    self.oq.put(e*2) 
    self.oq.task_done() 
    print "Producer %s produced %d and put it to output Queue"%(self.getName(), e*2) 

你不應該做self.oq.task_done()這裏,因爲你還沒有使用oq.get()

雖然我不確定這是唯一的問題。

編輯:

您的其他問題,你用在最後iq.join()oq.join(),這會導致你的主線程退出其他線程打印檢索頁面之前,既然你創建你的線程作爲Daemons,您的Python應用程序將退出而不等待它們完成執行。 (請記住,Queue.join()取決於Queue.task_done()

現在你在說「如果我把所有東西放在task_done()命令之後,我會阻塞其他線程的隊列」。我看不出你的意思,這隻會阻止你的Consumer線程,但你總是可以創建更多的Consumer線程,這些線程不會被對方阻塞。