假設我有以下多重結構:如何使用多處理從輸出隊列「批量寫入」?
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
if working_queue.empty() == True:
break
else:
picked = working_queue.get()
res_item = "Number " + str(picked)
output_queue.put(res_item)
return
if __name__ == '__main__':
static_input = xrange(100)
working_q = mp.Queue()
output_q = mp.Queue()
results_bank = []
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(2)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() == True:
break
results_bank.append(output_q.get_nowait())
if len(results_bank) == len(static_input):
print "Good run"
else:
print "Bad run"
我的問題:如何將我「批」寫我的結果到一個文件,而working_queue仍在「工作」(或至少,沒有完成)?
注意:我的實際數據結構對相對於輸入的無序結果不敏感(儘管我的示例使用了整數)。
此外,我認爲從輸出隊列寫入批次/集合是最佳實踐,而不是來自增長結果庫對象。但是,我願意接受任何一種方法。我是多處理新手,對這個問題不確定最佳實踐或最有效的解決方案。
@martineau感謝澄清'批'。我自己也要做同樣的事情。 –
好像你可以在'worker()'中有一個嵌套循環,直到working_queue被清空。這是你的意思是「批量寫入」嗎? – martineau
不,我的數據需求更多「隨着結果的積累,寫出來」。比如說,把結果寫成'sets'爲5. 根據我對你的建議的理解,這將在working_queue結尾寫出結果,這基本上等於我的'成長對象'(結果銀行)在上面的例子。 還是你的意思是建議,因爲working_queue'清除'或'刷新'我可以寫出結果集? –