13

我試圖在Python 2.7中使用 Queue.Queue實現多線程生產者 - 消費者模式。我試圖弄清楚如何讓 使用者(即工作者線程)在完成所有必需的工作後停止。如何在多線程生產者 - 使用者模式下完成工作線程後退出工作線程?

見馬丁詹姆斯第二評論這個答案:https://stackoverflow.com/a/19369877/1175080

發送「我完了」的任務,指示池中的線程終止。任何得到這樣的任務的線程都會重新執行它,然後自殺。

但這不適用於我。例如,請參閱以下代碼。

import Queue 
import threading 
import time 

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     data = q.get() 
     print 'worker', n, 'got', data 
     time.sleep(1) # Simulate noticeable data processing time 
     q.task_done() 
     if data == -1: # -1 is used to indicate that the worker should stop 
      # Requeue the exit indicator. 
      q.put(-1) 
      # Commit suicide. 
      print 'worker', n, 'is exiting' 
      break 

def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Create 3 workers. 
    for i in range(3): 
     t = threading.Thread(target=worker, args=(i, q)) 
     t.start() 

    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 
     time.sleep(0.5) 

    # Send an exit indicator for all threads to consume. 
    q.put(-1) 

    print 'waiting for workers to finish ...' 
    q.join() 
    print 'done' 

master() 

該程序掛起所有三個工人已經閱讀出口指示燈, 即-1從隊列後,因爲每個工人重新排隊-1前 退出,所以隊列永遠不會成爲空和q.join()永遠不會返回。

我想出了以下但醜陋的解決方案,我通過隊列發送-1出口 指標爲每名工人,使每個工人可以看到它 和自殺。但是,我必須爲每個工作人員發送退出指示 ,這一事實感覺有點難看。

import Queue 
import threading 
import time 

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     data = q.get() 
     print 'worker', n, 'got', data 
     time.sleep(1) # Simulate noticeable data processing time 
     q.task_done() 
     if data == -1: # -1 is used to indicate that the worker should stop 
      print 'worker', n, 'is exiting' 
      break 

def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Create 3 workers. 
    for i in range(3): 
     t = threading.Thread(target=worker, args=(i, q)) 
     t.start() 

    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 
     time.sleep(0.5) 

    # Send one stop indicator for each worker. 
    for i in range(3): 
     q.put(-1) 

    print 'waiting for workers to finish ...' 
    q.join() 
    print 'done' 

master() 

我有兩個問題。

  1. 是否可以爲所有線程發送單個退出指示符的方法(如Martin James的https://stackoverflow.com/a/19369877/1175080的第二條評論所述)甚至可以工作?
  2. 如果上一個問題的答案是「否」,是否有辦法解決問題,我不必爲每個工作線程發送單獨的退出指示符?
+0

發送每個工作看起來像一個很好的解決方案爲我的殺人信號,我不會說是如此醜陋。您也可以加入線程而不是加入隊列 – Netwave

+2

請注意,有一個['ThreadPool'](https://stackoverflow.com/a/3386632/3767239)類可用,它負責「手動」分配任務在多個線程之間。你可以'加入'這樣的池(而不是隊列),然後發送*「停止」*信號將最終終止所有線程。其實我不明白你爲什麼要「加入」隊列而不是線程。使用Python 3,你可以通過[concurrent](https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor)模塊獲得更多的功能(並且更好的記錄)。 –

+0

一些額外的評論。從你的示例代碼中不清楚爲什麼你會首先使用這樣一個*「stop」*命令(你可以把這個部分放出來,等待隊列加入)。然後 - 如果你使用了這樣的命令 - 不能保證每個線程都會「正常」關閉:'q.join()'可能會在所有線程收到'-1'之前恢復,因爲你調用了'q'。在重新將'-1'加入到隊列之前task_done()'(這意味着在重新放置'-1'之前任務計數可以達到零(這增加了計數)並且因此'q.join()'可以恢復)。 –

回答

11

不要將它稱爲任務的特例。

改爲使用Event,爲您的員工實施非阻塞實施。

stopping = threading.Event() 

def worker(n, q, timeout=1): 
    # run until the master thread indicates we're done 
    while not stopping.is_set(): 
     try: 
      # don't block indefinitely so we can return to the top 
      # of the loop and check the stopping event 
      data = q.get(True, timeout) 
     # raised by q.get if we reach the timeout on an empty queue 
     except queue.Empty: 
      continue 
     q.task_done() 

def master(): 
    ... 

    print 'waiting for workers to finish' 
    q.join() 
    stopping.set() 
    print 'done' 
+1

我相信你的意思是'q.get(True,timeout)',即你的意思是'q.get()'的'block'參數是'True'而不是'False'。如果將它設置爲'False',那麼'q.get()'根本不會被阻止,並立即返回,這將導致'while'循環非常快地旋轉。如果您將其設置爲「True」,則在解鎖前阻止「超時」秒。 –

+0

你說得對,我知道了。固定。 :) –

11

可以發送一個出口指標的所有線程(如馬丁詹姆斯https://stackoverflow.com/a/19369877/1175080第二註釋解釋)的方法,甚至工作?

正如你已經注意到它不能工作,傳播消息將使最後一個線程更新隊列與一個更多的項目,因爲你正在等待一個永遠不會是空的隊列,而不是你的代碼有。

如果上一個問題的答案是「否」,是否有辦法解決問題的方式,我不必爲每個工作線程發送單獨的退出指示符?

可以join線程,而不是隊列:

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     data = q.get() 
     print 'worker', n, 'got', data 
     time.sleep(1) # Simulate noticeable data processing time 
     q.task_done() 
     if data == -1: # -1 is used to indicate that the worker should stop 
      # Requeue the exit indicator. 
      q.put(-1) 
      # Commit suicide. 
      print 'worker', n, 'is exiting' 
      break 

def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Create 3 workers. 
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)] 
    for t in threads: 
     threads.start() 
    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 
     time.sleep(0.5) 

    # Send an exit indicator for all threads to consume. 
    q.put(-1) 

    print 'waiting for workers to finish ...' 
    for t in threads: 
     t.join() 
    print 'done' 

master() 

由於Queue documentation解釋get方法將上升,一旦其空的execption所以如果你已經知道的數據處理您可以填寫隊列和那麼垃圾郵件的主題:

import Queue 
import threading 
import time 

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     try: 
      data = q.get(block=False, timeout=1) 
      print 'worker', n, 'got', data 
      time.sleep(1) # Simulate noticeable data processing time 
      q.task_done() 
     except Queue.Empty: 
      break 


def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 

    # Create 3 workers. 
    for i in range(3): 
     t = threading.Thread(target=worker, args=(i, q)) 
     t.start() 

    print 'waiting for workers to finish ...' 
    q.join() 
    print 'done' 

master() 

這裏有一個live example

+0

你能回答我的第一個問題嗎? –

+0

@LoneLearner,是的,給我一個:) – Netwave

+0

@DanielSanchez你的回答很好,但你的第二個例子改變了這個問題。 OP首先實例化他的工作線程,然後填充隊列,同時交換這些任務。如果您沒有交換這些任務,那麼如果在添加工作負載之前看到空隊列,則可能會有一些工作線程提早退出(由於超時)。刪除附加線程通信信號的方法是適當的。就個人而言,我認爲有線程塊的工作和主線程控制退出是大多數應用程序的最乾淨的方法。 – tdube

3

除了@DanielSanchez出色答卷,我建議實際上 依靠類似的機制爲Java CountDownLatch

要點之中,

  • 創建latch只開放以後一定計數器下去
  • 當鎖被打開時,該線程(或多個)等待它將被允許繼續執行。

  • 我做了一個過於簡單的例子,檢查here一類像例子這樣的鎖存:

    import threading 
    import Queue 
    import time 
    
    WORKER_COUNT = 3 
    latch = threading.Condition() 
    count = 3 
    
    def wait(): 
        latch.acquire() 
        while count > 0: 
         latch.wait() 
        latch.release() 
    
    def count_down(): 
        global count 
        latch.acquire() 
        count -= 1 
        if count <= 0: 
         latch.notify_all() 
        latch.release() 
    
    def worker(n, q): 
        # n - Worker ID 
        # q - Queue from which to receive data 
        while True: 
         data = q.get() 
         print 'worker', n, 'got', data 
         time.sleep(1) # Simulate noticeable data processing time 
         q.task_done() 
         if data == -1: # -1 is used to indicate that the worker should stop 
          # Requeue the exit indicator. 
          q.put(-1) 
          # Commit suicide. 
          count_down() 
          print 'worker', n, 'is exiting' 
          break 
    
    # master() sends data to worker() via q. 
    
    def master(): 
        q = Queue.Queue() 
    
        # Create 3 workers. 
        for i in range(WORKER_COUNT): 
         t = threading.Thread(target=worker, args=(i, q)) 
         t.start() 
    
        # Send 10 items to work on. 
        for i in range(10): 
         q.put(i) 
         time.sleep(0.5) 
    
        # Send an exit indicator for all threads to consume. 
        q.put(-1) 
        wait() 
        print 'done' 
    
    master() 
    
4

只是爲了完整性緣故 你也可以排隊停止信號是 - (線程數)。 每個線程則可以通過一個遞增並重新排隊它只有在停止信號!= 0

if data < 0: # negative numbers are used to indicate that the worker should stop 
     if data < -1: 
      q.put(data + 1) 
     # Commit suicide. 
     print 'worker', n, 'is exiting' 
     break 

但我會親自去與Travis Mehlinger Daniel Sanchez答案。