2014-02-07 18 views
1

從下面的代碼中,我期望結果列表的長度與多進程所在的項目範圍之一相同:Python多重處理的輸出隊列提供了比預期更多的結果

import multiprocessing as mp 

def worker(working_queue, output_queue): 
    while True: 
     if working_queue.empty() is True: 
      break #this is supposed to end the process. 
     else: 
      picked = working_queue.get() 
      if picked % 2 == 0: 
       output_queue.put(picked) 
      else: 
       working_queue.put(picked+1) 
    return 

if __name__ == '__main__': 
    static_input = xrange(100)  
    working_q = mp.Queue() 
    output_q = mp.Queue() 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() is True: 
      break 
     else: 
      results_bank.append(output_q.get()) 
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. 
    results_bank.sort() 
    print results_bank 

有沒有人有關如何使此代碼正常運行的任何想法?

+0

順便說一句,我將十分感謝,如果你能幫助我實現什麼使一個多Python代碼不明智的操作系統平臺。如果在Windows 7或MacOS中運行,上述代碼的行爲會有所不同;在前者中,控制檯不響應,而在後者中重複結果中的項目。 – Jaqo

回答

1

此代碼永遠不會停止:

每個工人都從隊列中獲得一個項目,只要它不是空的:

picked = working_queue.get() 

,並提出爲每個它得到了一個新問題:

working_queue.put(picked+1) 

因此,隊列將永遠不會爲空,除非進程之間的時間恰好是隊列爲空時,其中一個進程調用empty()。由於隊列長度最初爲100,並且您擁有與cpu_count()一樣多的進程,所以如果這種情況在任何實際系統上停止,我都會感到驚訝。

那麼稍微修改一下代碼就證明我錯了,它在某個時刻停止了,這實際上讓我感到驚訝。用一個進程執行代碼似乎存在一個錯誤,因爲一段時間後進程凍結但不返回。多個進程的結果是不同的。

在循環迭代中添加一個短暫的休眠期使得代碼按照我預期的方式工作,並在上面進行了解釋。雖然它們應該是線程安全的,但似乎在Queue.put,Queue.getQueue.empty之間有一些時間問題。除去empty測試也給出了預期的結果(沒有陷入空隊列)。

找到不同行爲的原因。放入隊列的對象不會立即刷新。因此,empty可能會返回False,但隊列中的項目正在等待刷新。

documentation

:當一個對象被放在一個隊列,該目的是酸洗和 後臺線程以後的酸洗數據刷新到底層 管。這有一些令人驚訝的後果,但 不應該導致任何實際困難 - 如果他們真的打擾 你,那麼你可以改爲使用由經理創建的隊列。

  1. 把一個物體在一個空隊列後可能會有一個微小的延遲隊列的空()方法返回False和get_nowait(前)可以在不提高Queue.Empty返回。

  2. 如果有多個進程正在排列對象,那麼可能會在另一端無序地接收對象。然而,由相同過程入隊的對象將始終按預期順序相對於彼此。

+0

感謝您的回答。我在這兩個空測試中都包含了缺少的其他部分,但它沒有奏效。我試圖刪除空測試,但過程仍然卡住。我使用空測試來確保過程在某個時刻結束。否則,在詢問空隊列中的數據時,進程會卡住。我認爲.emtpy()方法的不可靠性只是上一輪進程的一個問題,當隊列中的項目很少時,CPU錯誤地將隊列讀取爲空,而其他CPU只是排隊更多項目。在這種情況下,單個CPU應該能夠完成工作。 – Jaqo

+0

能否使用.lock而不是全局變量(其長度與原始排隊數據的數量進行比較)是解決方案,以便在它們陷入空隊列之前結束進程? – Jaqo

+0

@Yag一個簡單的解決方法是給'get()'增加一個小超時,這樣當隊列不再遞送項目而不是檢查'空'時就會拋出異常。當然,似乎也不能保證它一直工作。否則,我想你需要使用共享狀態。儘管我沒有意識到,但可能有更好的解決方案。 – Nabla