一個非常簡單的方法來IMPL ement「粘性會話」是讓你自己的版本multiprocessing.Pool
不急於分配工作項目,但確定性地分配它們。下面是一個不完整的,但運行的解決方案:
import multiprocessing
import os
import time
def work(job):
time.sleep(1)
print "I am process", os.getpid(), "processing job", job
class StickyPool:
def __init__(self, processes):
self._inqueues = [multiprocessing.Queue() for ii in range(processes)]
self._pool = [multiprocessing.Process(target=self._run, args=(self._inqueues[ii],)) for ii in range(processes)]
for process in self._pool:
process.start()
def map(self, fn, args):
for arg in args:
ii = hash(arg) % len(self._inqueues)
self._inqueues[ii].put((fn, arg))
def _run(self, queue):
while True:
fn, arg = queue.get()
fn(arg)
pool = StickyPool(3)
#pool = multiprocessing.Pool(3)
pool.map(work, [1,2,3,4,1,2,3,4,1,2,3,4])
time.sleep(4)
當使用上述StickyPool
,工作是基於他們的論據哈希分配。這意味着每次都有相同的參數進入相同的過程。如果有很多哈希值相互獨立的值,那麼平均分配工作就不夠聰明,但是爲將來的改進提供了很好的空間。我也沒有關閉關機邏輯,所以如果您使用StickyPool
,程序不會停止運行,但是如果您使用的是multiprocessing.Pool
,則程序不會停止運行。解決這些問題並實現更多的Pool
接口(如apply()
,並返回map()
的結果)作爲練習。
通常用於描述您要查找的術語是「粘性會話」。 –
@JohnZwinck謝謝你的回覆。有沒有我可以檢查的參考?謝謝。 – galaxyan