2012-05-15 98 views
10

我試圖找到約61億(自定義)項目的最大權重,我想用並行處理來做到這一點。對於我的特殊應用程序,有更好的算法,不需要我重複61億個項目,但解釋它們的教科書超過了我的頭,我的老闆想在4天內完成。我想我的公司的花哨服務器和並行處理有更好的效果。但是,我所瞭解的有關並行處理的一切都來自於讀取Pythondocumentation。也就是說,我很迷茫......避免Python 3中的多處理隊列中的競爭條件隊列

我目前的理論是建立一個饋線過程,輸入隊列,工作流程的一大堆(比如說30)和一個輸出隊列(找到輸出隊列中的最大元素將是微不足道的)。我不明白的是饋線進程如何告訴工作者進程何時停止等待物品通過輸入隊列。

我曾考慮在6.1E9項目的迭代器上使用multiprocessing.Pool.map_async,但只需要將近10分鐘的時間來遍歷項目,而無需對它們執行任何操作。 除非我誤解了某些東西......,有map_async迭代通過它們將它們分配給進程可以在進程開始工作時完成。 (Pool還提供imapdocumentation說,這是類似於map,這似乎並沒有異步工作我想異步,右?)

相關問題:我想用concurrent.futures代替multiprocessing ?我不可能成爲第一個實施雙排隊系統的人(這正是美國每個熟食店裏的線路的工作原理......),那麼是否有更加Pythonic /內置的方式來做到這一點?

這是我正在嘗試做的事情的骨架。 查看中間的評論塊。

import multiprocessing as mp 
import queue 

def faucet(items, bathtub): 
    """Fill bathtub, a process-safe queue, with 6.1e9 items""" 
    for item in items: 
     bathtub.put(item) 
    bathtub.close() 

def drain_filter(bathtub, drain): 
    """Put maximal item from bathtub into drain. 
    Bathtub and drain are process-safe queues. 
    """ 
    max_weight = 0 
    max_item = None 
    while True: 
     try: 
      current_item = bathtub.get() 
     # The following line three lines are the ones that I can't 
     # quite figure out how to trigger without a race condition. 
     # What I would love is to trigger them AFTER faucet calls 
     # bathtub.close and the bathtub queue is empty. 
     except queue.Empty: 
      drain.put((max_weight, max_item)) 
      return 
     else: 
      bathtub.task_done() 
     if not item.is_relevant(): 
      continue 
     current_weight = item.weight 
     if current_weight > max_weight: 
      max_weight = current_weight 
      max_item = current_item 

def parallel_max(items, nprocs=30): 
    """The elements of items should have a method `is_relevant` 
    and an attribute `weight`. `items` itself is an immutable 
    iterator object. 
    """ 
    bathtub_q = mp.JoinableQueue() 
    drain_q = mp.Queue() 

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q)) 
    worker_procs = mp.Pool(processes=nprocs) 

    faucet_proc.start() 
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q) 

    finalists = [] 
    for i in range(nprocs): 
     finalists.append(drain_q.get()) 

    return max(finalists) 


這裏的答案

我發現了一個非常全面的回答我的問題,和一個溫柔的介紹在Python基金會聯絡部主任道格·赫爾曼多任務處理。我想要的是「毒丸」模式。看看這裏:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

道具@MRAB發佈該概念的內核。

+0

如果您使用'multiprocessing.Queue',爲什麼'import queue'? –

+0

當工作人員查看他的輸入隊列時,我使用它來捕獲一個'queue.Empty'異常。我的一廂情願的想法是,當且僅當隊列已關閉而且也是空的時候,纔會拋出異常。請注意,根據@ MRAB的方法,在他的答案中,導入'queue'將是不明智的。 – wkschwartz

回答

3

您可以在隊列中放置特殊的終止項目,例如無。當工人看到它時,它可以讓其他工人看到,然後終止。或者,您可以將每個工作人員的一個特殊終止項目放入隊列中。

+1

這是一個非常好的答案。儘管如果有人能回答相關的問題,我會把這個問題打開一會兒。 – wkschwartz

+0

因此,在關閉隊列之前,我在'faucet()'末尾放了'bathtub.put(None)',但是如何處理'task_done()'調用呢?或者我只是從'JoinableQueue'切換到常規'Queue'並擺脫所有'task_done()'調用? – wkschwartz