如果getRenewed
是冪等的(也就是說,您可以多次調用它,而沒有副作用),您可以簡單地將現有的計時器代碼移動到工作進程中,並讓他們在注意到自己的計時器撞倒。這隻需要從您傳遞在列表中的項目同步,並multiprocessing.Pool
可以處理足夠容易:
def setup_worker():
global clockExp, a
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
def worker(item):
global clockExp, a
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
f(item, a)
def main(L):
pool = multiprocessing.Pool(initializer=setup_worker)
pool.map(worker, L)
如果getRenewed
不冪等,事情將需要更復雜些。您不能在每個工作進程中調用它,因此您需要在進程之間設置某種通信方法,以便每個進程在可用時都可以獲得最新版本。
我建議使用multiprocessing.queue
將a
值從主進程傳遞給工人。您仍然可以使用Pool
作爲列表項,您只需確保在主進程中異步使用它。與此類似,也許:
def setup_worker2(queue):
global x
x = random.random()
global a_queue, a, clockExp
a_queue = queue
a = a_queue.get() # wait for the first `a` value
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
def worker2(item):
global a, clockExp
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
try:
a = a_queue.get_nowait()
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
except queue.Empty:
pass
return f(item, a)
def main2(L):
queue = multiprocessing.Queue() # setup the queue for the a values
pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))
result = pool.map_async(worker2, L) # send the items to the pool asynchronously
while True: # loop for sending a values through the queue
a = getRenewed() # get a new item
for _ in range(os.cpu_count()):
queue.put(a) # send one copy per worker process
try:
result.wait(900-5) # sleep for ~15 minutes, or until the result is ready
except multiprocessing.TimeoutError:
pass # if we got a timeout, keep looping!
else:
break # if not, we are done, so break out of the loop!
工人們仍然需要有有一些計時代碼,否則你面臨的競爭條件,其中一個工人可能會消耗兩個單發下來隊列中a
值批量來自主流程。如果對f
的某些調用比其他調用慢得多(如果涉及從網上下載東西的話可能很可能會發生這種情況)。
「f」或「getRenewed」依賴任何特定於進程的狀態,還是隻依賴(或修改)外部狀態? – Blckknght 2014-10-02 22:27:20
f下載一個網站,通過L. getRenewed中的項目口述得到一個認證令牌。 – Kevin 2014-10-02 23:37:14