2017-03-28 30 views
1

假設我有以下多重結構:如何使用多處理從輸出隊列「批量寫入」?

import multiprocessing as mp 
def worker(working_queue, output_queue): 
    while True: 
     if working_queue.empty() == True: 
      break 
     else: 
      picked = working_queue.get() 
      res_item = "Number " + str(picked) 
      output_queue.put(res_item) 
    return 

if __name__ == '__main__': 
    static_input = xrange(100)  
    working_q = mp.Queue() 
    output_q = mp.Queue() 
    results_bank = [] 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(2)] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() == True: 
      break 
     results_bank.append(output_q.get_nowait()) 
    if len(results_bank) == len(static_input): 
     print "Good run" 
    else: 
     print "Bad run" 

我的問題:如何將我「批」寫我的結果到一個文件,而working_queue仍在「工作」(或至少,沒有完成)?

注意:我的實際數據結構對相對於輸入的無序結果不敏感(儘管我的示例使用了整數)。

此外,我認爲從輸出隊列寫入批次/集合是最佳實踐,而不是來自增長結果庫對象。但是,我願意接受任何一種方法。我是多處理新手,對這個問題不確定最佳實踐或最有效的解決方案。

+0

@martineau感謝澄清'批'。我自己也要做同樣的事情。 –

+0

好像你可以在'worker()'中有一個嵌套循環,直到working_queue被清空。這是你的意思是「批量寫入」嗎? – martineau

+0

不,我的數據需求更多「隨着結果的積累,寫出來」。比如說,把結果寫成'sets'爲5. 根據我對你的建議的理解,這將在working_queue結尾寫出結果,這基本上等於我的'成長對象'(結果銀行)在上面的例子。 還是你的意思是建議,因爲working_queue'清除'或'刷新'我可以寫出結果集? –

回答

1

如果你想使用mp.Process ES和mp.Queue S,這裏是處理分批結果的方式。其主要思想是在writer功能,如下:

import itertools as IT 
import multiprocessing as mp 
SENTINEL = None 
static_len = 100 

def worker(working_queue, output_queue): 
    for picked in iter(working_queue.get, SENTINEL): 
     res_item = "Number {:2d}".format(picked) 
     output_queue.put(res_item) 

def writer(output_queue, threshold=10): 
    result_length = 0 
    items = iter(output_queue.get, SENTINEL) 
    for batch in iter(lambda: list(IT.islice(items, threshold)), []): 
     print('\n'.join(batch)) 
     result_length += len(batch) 
    state = 'Good run' if result_length == static_len else 'Bad run' 
    print(state) 

if __name__ == '__main__': 
    num_workers = 2 

    static_input = range(static_len) 
    working_q = mp.Queue() 
    output_q = mp.Queue() 

    writer_proc = mp.Process(target=writer, args=(output_q,)) 
    writer_proc.start() 

    for i in static_input: 
     working_q.put(i) 

    processes = [mp.Process(target=worker, args=(working_q, output_q)) 
       for i in range(num_workers)] 
    for proc in processes: 
     proc.start() 
     # Put SENTINELs in the Queue to tell the workers to exit their for-loop 
     working_q.put(SENTINEL) 
    for proc in processes: 
     proc.join() 

    output_q.put(SENTINEL) 
    writer_proc.join() 

當兩個參數:iter預計可贖回和哨兵: iter(callable, sentinel)。可調用(即函數)被重複調用,直到它返回一個等於sentinel的值。所以

items = iter(output_queue.get, SENTINEL) 

定義items是一個迭代,當遍歷,將output_queue 返回的項目,直到output_queue.get()回報SENTINEL

for-loop

for batch in iter(lambda: list(IT.islice(items, threshold)), []): 

反覆調用lambda函數,直到返回一個空列表。當被調用時,lambda函數返回一個列表,最多threshold可迭代的items中的項目數。因此,這是「由n項分組而沒有填充」的成語。有關此成語的更多信息,請參閱this post


請注意,這不是一個很好的實踐來檢驗working_q.empty()。它可能導致競爭狀況。例如,假設我們有在這些線路上的2個worker過程時working_q只有1個留在它的項目:

def worker(working_queue, output_queue): 
    while True: 
     if working_queue.empty() == True:  <-- Process-1 
      break 
     else: 
      picked = working_queue.get()   <-- Process-2 
      res_item = "Number " + str(picked) 
      output_queue.put(res_item) 
    return 

假設Process-1電話working_queue.empty()同時還有在隊列中的一個項目。所以它返回False。然後Process-2調用working_queue.get()並獲得最後一個項目。然後Process-1進入picked = working_queue.get()行並掛起,因爲隊列中沒有更多項目。

因此,使用哨兵(如上所示)具體發出信號應當何時停止而不是檢查queue.empty()應停止。

+0

請問你能解釋一下你的代碼開始於'res_item in iter(output_queue.get,SENTINEL):' 特別是,我不確定爲什麼(或如何)你的'if len(batch)> = threshold '聲明在'if len(batch)'中是'重複的'...看起來您擴展了結果對象,無論是否已達到閾值? 無論閾值,批處理等如何,打印都很好,但理解「批量」長度等於閾值以便用寫入文件替換「打印」語句至關重要。 –

+0

@DVHughes:我在上面添加了一些解釋詞。如果有什麼不清楚的,請告訴我。 – unutbu

+0

好的!謝謝,我認爲這是你提到的'剩餘'批次。只是想在添加任何寫入語句之前確定。 如果len(批處理)使得它看起來像你測試的批次不是空的..但實際上你是在測試是否有任何殘留/遺留。相同的區別,但是在概念上清楚是很好的,特別是對於其他人而言,隨着時間的推移,這些問題在線程中發生。 謝謝!在我的示例代碼中,這非常容易理解並與我的基於隊列的多進程方法相一致。 –

0

沒有像「批q.get」這樣的操作。但是,將一批物品而不是一件物品放入/彈出是一種很好的做法。

這正是multiprocessing.Pool.map與它的參數chunksize :)

做爲了儘快有Pool.imap_unordered它返回一個迭代器而不是列表寫入輸出。

def work(item): 
    return "Number " + str(item) 

import multiprocessing 
static_input = range(100) 
chunksize = 10 
with multiprocessing.Pool() as pool: 
    for out in pool.imap_unordered(work, static_input, chunksize): 
     print(out) 
相關問題