2016-05-11 90 views
0

我有一些對象需要處理。我想知道是否有辦法根據唯一鍵將工作(流程)分配給對象。
當代碼第一次看到對象時,它應該被隨機分配一個工作者,但是如果對象再次出現,它應該被分配給之前處理該對象的工作者。謝謝
蟒蛇多處理池分配對象到工作人員

例如:
worker A,B,C |第一束對象1,2,3,4第二組對象1,3-
第一束物體:
工人甲< --- 1,3-
工人乙< --- 2
工人Ç< --- 4個
第二組對象:
工人甲< --- 1,3-
工人乙< ---
工人ç< ---

+0

通常用於描述您要查找的術語是「粘性會話」。 –

+0

@JohnZwinck謝謝你的回覆。有沒有我可以檢查的參考?謝謝。 – galaxyan

回答

0

一個非常簡單的方法來IMPL ement「粘性會話」是讓你自己的版本multiprocessing.Pool不急於分配工作項目,但確定性地分配它們。下面是一個不完整的,但運行的解決方案:

import multiprocessing 
import os 
import time 

def work(job): 
    time.sleep(1) 
    print "I am process", os.getpid(), "processing job", job 

class StickyPool: 
    def __init__(self, processes): 
     self._inqueues = [multiprocessing.Queue() for ii in range(processes)] 
     self._pool = [multiprocessing.Process(target=self._run, args=(self._inqueues[ii],)) for ii in range(processes)] 
     for process in self._pool: 
      process.start() 

    def map(self, fn, args): 
     for arg in args: 
      ii = hash(arg) % len(self._inqueues) 
      self._inqueues[ii].put((fn, arg)) 

    def _run(self, queue): 
     while True: 
      fn, arg = queue.get() 
      fn(arg) 

pool = StickyPool(3) 
#pool = multiprocessing.Pool(3)                       

pool.map(work, [1,2,3,4,1,2,3,4,1,2,3,4]) 
time.sleep(4) 

當使用上述StickyPool,工作是基於他們的論據哈希分配。這意味着每次都有相同的參數進入相同的過程。如果有很多哈希值相互獨立的值,那麼平均分配工作就不夠聰明,但是爲將來的改進提供了很好的空間。我也沒有關閉關機邏輯,所以如果您使用StickyPool,程序不會停止運行,但是如果您使用的是multiprocessing.Pool,則程序不會停止運行。解決這些問題並實現更多的Pool接口(如apply(),並返回map()的結果)作爲練習。

+0

非常感謝。 – galaxyan