2017-03-02 55 views
4

我正在學習Python的線程模塊,並寫了下面的代碼,以幫助自己瞭解爲什麼Queue.join()在這裏是必需的?

from Queue import Queue 
import threading 

lock = threading.Lock() 
MAX_THREADS = 8 
q = Queue() 
count = 0 

# some i/o process 
def io_process(x): 
    pass 

# process that deals with shared resources 
def shared_resource_process(x): 
    pass 

def func(): 
    global q, count 
    while not q.empty(): 
     x = q.get() 
     io_process(x) 
     if lock.acquire(): 
      shared_resource_process(x) 
      print '%s is processing %r' %(threading.currentThread().getName(), x) 
      count += 1   
      lock.release() 

def main(): 
    global q 
    for i in range(40): 
     q.put(i) 

    threads = [] 
    for i in range(MAX_THREADS): 
     threads.append(threading.Thread(target=func)) 

    for t in threads: 
     t.start() 

    for t in threads: 
     t.join() 

    print 'multi-thread done.' 
    print count == 40 

if __name__ == '__main__': 
    main() 

和輸出被困這樣的:

Thread-1 is processing 32 
Thread-8 is processing 33 
Thread-6 is processing 34 
Thread-2 is processing 35 
Thread-5 is processing 36 
Thread-3 is processing 37 
Thread-7 is processing 38 
Thread-4 is processing 39 

注意,在主)的打印(不這意味着一些線程掛起/阻塞?

然後我加入q.task_done()修改FUNC()方法:

if lock.acquire(): 
      shared_resource_process(x) 
      print '%s is processing %r' %(threading.currentThread().getName(), x) 
      count += 1 
      q.task_done() # why is this necessary ? 
      lock.release() 

,現在所有的線程終止如我所料,並得到正確的輸出:

Thread-6 is processing 36 
Thread-4 is processing 37 
Thread-3 is processing 38 
Thread-7 is processing 39 
multi-thread done. 
True 

Process finished with exit code 0 

我讀Queue.Queue here的文檔,並看到task_done()與queue.join()一起工作,以確保處理隊列中的所有項目。但是因爲我沒有在main()中調用queue.join(),爲什麼在func()中需要task_done()?當我錯過task_done()代碼時,線程掛起/阻塞的原因是什麼?

回答

3

您的代碼中存在競爭條件。試想一下,你只有留在Queue一個項目,你會使用只有兩個線程,而不是8。然後下面的事件序列發生:

  1. 線程A調用q.empty,以檢查它是否是空的或不是。由於隊列中有一個項目是False並且循環體被執行。
  2. 在線程A調用q.get之前,有一個上下文切換,線程B開始運行。
  3. 線程B調用q.empty,隊列中還有一個項目,因此結果爲False並且循環體被執行。
  4. 線程B不帶參數調用q.get,並立即返回隊列中的最後一項。然後,線程B處理該項目並退出,因爲q.empty返回True
  5. 線程A運行。既然它已經在步驟1中調用了q.empty,它將在下一個調用q.get,但是這會永遠阻止,因此你的程序不會終止。

您可以通過導入time和改變環路有點模仿上述行爲:

while not q.empty(): 
    time.sleep(0.1) # Force context switch 
    x = q.get() 

注意行爲是相同的,無論是否task_done被稱爲與否。

那麼爲什麼添加task_done有幫助?默認情況下,Python 2將每100個解釋器指令執行上下文切換,因此添加代碼可能會改變發生上下文切換的位置。有關更好的解釋,請參閱another questionlinked PDF。在我的機器上,程序沒有掛起,不管是否有task_done在那裏,所以這只是一個猜測是什麼導致它發生在你身上。

如果你想修復行爲,你可以只是無限循環,並將參數傳遞給get,指示它不阻塞。這將導致get最終拋出Queue.Empty異常,你可以捕獲並打破循環:

from Queue import Queue, Empty 

def func(): 
    global q, count 
    while True: 
     try: 
      x = q.get(False) 
     except Empty: 
      break 
     io_process(x) 
     if lock.acquire(): 
      shared_resource_process(x) 
      print '%s is processing %r' %(threading.currentThread().getName(), x) 
      count += 1 
      lock.release() 
+0

謝謝您的回答,很明顯,真正幫助 – vansdev