我試圖找到約61億(自定義)項目的最大權重,我想用並行處理來做到這一點。對於我的特殊應用程序,有更好的算法,不需要我重複61億個項目,但解釋它們的教科書超過了我的頭,我的老闆想在4天內完成。我想我的公司的花哨服務器和並行處理有更好的效果。但是,我所瞭解的有關並行處理的一切都來自於讀取Pythondocumentation。也就是說,我很迷茫......避免Python 3中的多處理隊列中的競爭條件隊列
我目前的理論是建立一個饋線過程,輸入隊列,工作流程的一大堆(比如說30)和一個輸出隊列(找到輸出隊列中的最大元素將是微不足道的)。我不明白的是饋線進程如何告訴工作者進程何時停止等待物品通過輸入隊列。
我曾考慮在6.1E9項目的迭代器上使用multiprocessing.Pool.map_async
,但只需要將近10分鐘的時間來遍歷項目,而無需對它們執行任何操作。 除非我誤解了某些東西......,有map_async
迭代通過它們將它們分配給進程可以在進程開始工作時完成。 (Pool
還提供imap
但documentation說,這是類似於map
,這似乎並沒有異步工作我想異步,右?)
相關問題:我想用concurrent.futures
代替multiprocessing
?我不可能成爲第一個實施雙排隊系統的人(這正是美國每個熟食店裏的線路的工作原理......),那麼是否有更加Pythonic /內置的方式來做到這一點?
這是我正在嘗試做的事情的骨架。 查看中間的評論塊。
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
這裏的答案
我發現了一個非常全面的回答我的問題,和一個溫柔的介紹在Python基金會聯絡部主任道格·赫爾曼多任務處理。我想要的是「毒丸」模式。看看這裏:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
道具@MRAB發佈該概念的內核。
如果您使用'multiprocessing.Queue',爲什麼'import queue'? –
當工作人員查看他的輸入隊列時,我使用它來捕獲一個'queue.Empty'異常。我的一廂情願的想法是,當且僅當隊列已關閉而且也是空的時候,纔會拋出異常。請注意,根據@ MRAB的方法,在他的答案中,導入'queue'將是不明智的。 – wkschwartz