2014-02-06 81 views
2

我怎麼能腳本,使用兩個隊列,這些的?:Python的多與更新隊列和輸出隊列

  1. 一個作爲與一些數據開始工作隊列,取決於一個Python多進程要並行執行的功能條件,可以動態地接收進一步的任務,另一個收集結果並在處理完成後用於記錄結果。

我基本上需要在工作隊列中放置更多的任務,這取決於我在其初始項目中找到的內容。我在下面發表的例子很愚蠢(我可以根據自己的喜好將其轉換爲輸出隊列),但其機制很清晰,並反映了我需要開發的一部分概念。

特此我嘗試:

import multiprocessing as mp 

def worker(working_queue, output_queue): 
    item = working_queue.get() #I take an item from the working queue 
    if item % 2 == 0: 
     output_queue.put(item**2) # If I like it, I do something with it and conserve the result. 
    else: 
     working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__': 
    static_input = range(100)  
    working_q = mp.Queue() 
    output_q = mp.Queue() 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?). 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    for result in iter(output_q.get, None): 
     print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible. 

這並沒有結束,也沒有打印任何結果。

在整個過程結束時,我想確保工作隊列是空的,並且所有並行函數都在迭代後取出結果之前寫入輸出隊列。你有如何使其工作的建議?

回答

2

您在創建進程的行中存在拼寫錯誤。它應該是mp.Process,而不是mp.process。這是什麼導致你得到的例外。

此外,您不在工作中循環,因此它們實際上僅從隊列中消耗一個物品,然後退出。在不瞭解更多關於所需邏輯的情況下,提供具體建議並不容易,但您可能需要將worker函數的主體放入while True循環中,並在工作完成時在主體中添加一個條件以退出。

請注意,如果您沒有添加條件以顯式退出循環,則當隊列爲空時,您的工作人員將永遠停止。您可能會考慮使用所謂的毒丸技術來向他們可能退出的工人發出信號。您將在PyMOTW關於Communication Between processes的文章中找到一個示例和一些有用的討論。

至於要使用的進程數量,您需要進行基準測試以找出適合您的方法,但是,一般來說,每個內核的一個進程是您的工作負載受CPU限制的良好起點。如果您的工作負載是IO綁定的,那麼您可能會獲得更好的結果,並且員工數量更多。

+0

謝謝!我將在問題中編輯這個。 – Jaqo

+0

不客氣!請不要在你這樣做的時候編輯我的回覆,開始處理你問題的其餘部分。 – tawmas

+0

我剛剛讀了你答案的其餘部分。我會嘗試應用一段時間True循環。我想知道如果隊列中沒有更多項目可以處理,那麼過程是否完成。我想使用類似隊列長度的東西,但文檔聲明這是不可靠的。 – Jaqo

1

以下代碼實現了預期結果。它遵循@tawmas提出的建議。

此代碼允許在需要一個過程使用多個內核,該工人可以通過將其在處理過程中被更新提要數據的隊列:

import multiprocessing as mp 
def worker(working_queue, output_queue): 
    while True: 
     if working_queue.empty() == True: 
      break #this is the so-called 'poison pill'  
     else: 
      picked = working_queue.get() 
      if picked % 2 == 0: 
        output_queue.put(picked) 
      else: 
       working_queue.put(picked+1) 
    return 

if __name__ == '__main__': 
    static_input = xrange(100)  
    working_q = mp.Queue() 
    output_q = mp.Queue() 
    results_bank = [] 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() == True: 
      break 
     results_bank.append(output_q.get_nowait()) 
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. 
    results_bank.sort() 
    print results_bank 
+0

一旦結果隊列爲空,您的打印循環將永久等待。您應該使用get_nowait並顯式捕獲Empty異常以完全退出。 – tawmas

+0

再次感謝您的幫助。我正在做一個 嘗試:\ n打印結果\ n,除了空:\ n break \ n' 這是打印預期的總體結果,但控制檯輸出仍抱怨異常。我認爲我沒有妥善處理它。 – Jaqo

+0

你需要從你的嘗試中的隊列中獲得。 – tawmas