2014-10-02 54 views
1

我有一個大的列表L來操作。令f()是對L進行操作的函數。f()需要另一個變量,每15分鐘過期並需要更新。下面是一個例子,在串行:受到定時器的多處理器

def main(): 
    L = openList() 
    # START THE CLOCK 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 
    for item in L: 
     f(item, a) # operate on item given a 
     # CHECK TIME REMAINING 
     clockCur = dt.datetime.now() 
     clockRem = (clockExp - clockCur).total_seconds() 
     # RENEW a IF NEEDED 
     if clockRem < 5: # renew with 5 seconds left 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
      a = getRenewed() 

因爲f()需要幾秒鐘(或更長有時),我想並行化代碼。給定計時器的任何提示怎麼做?我設想共享clockExp和「a」,當進程滿足clockRem < 5時,它調用getRenewed()並共享新的「a」和clockExp,然後重複。

+0

「f」或「getRenewed」依賴任何特定於進程的狀態,還是隻依賴(或修改)外部狀態? – Blckknght 2014-10-02 22:27:20

+0

f下載一個網站,通過L. getRenewed中的項目口述得到一個認證令牌。 – Kevin 2014-10-02 23:37:14

回答

3

如果getRenewed是冪等的(也就是說,您可以多次調用它,而沒有副作用),您可以簡單地將現有的計時器代碼移動到工作進程中,並讓他們在注意到自己的計時器撞倒。這隻需要從您傳遞在列表中的項目同步,並multiprocessing.Pool可以處理足夠容易:

def setup_worker(): 
    global clockExp, a 

    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 

def worker(item): 
    global clockExp, a 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 

    if clockRem < 5: # renew with 5 seconds left 
     clockStart = dt.datetime.now() 
     clockExp = clockStart + dt.timedelta(seconds=900) 
     a = getRenewed() 

    f(item, a) 

def main(L): 
    pool = multiprocessing.Pool(initializer=setup_worker) 

    pool.map(worker, L) 

如果getRenewed不冪等,事情將需要更復雜些。您不能在每個工作進程中調用它,因此您需要在進程之間設置某種通信方法,以便每個進程在可用時都可以獲得最新版本。

我建議使用multiprocessing.queuea值從主進程傳遞給工人。您仍然可以使用Pool作爲列表項,您只需確保在主進程中異步使用它。與此類似,也許:

def setup_worker2(queue): 
    global x 
    x = random.random() 
    global a_queue, a, clockExp 

    a_queue = queue 
    a = a_queue.get() # wait for the first `a` value 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 

def worker2(item): 
    global a, clockExp 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 
    if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed 
     try: 
      a = a_queue.get_nowait() 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
     except queue.Empty: 
      pass 

    return f(item, a) 

def main2(L): 
    queue = multiprocessing.Queue()  # setup the queue for the a values 

    pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,)) 

    result = pool.map_async(worker2, L) # send the items to the pool asynchronously 

    while True:     # loop for sending a values through the queue 
     a = getRenewed()   # get a new item 
     for _ in range(os.cpu_count()): 
      queue.put(a)   # send one copy per worker process 

     try: 
      result.wait(900-5) # sleep for ~15 minutes, or until the result is ready 
     except multiprocessing.TimeoutError: 
      pass     # if we got a timeout, keep looping! 
     else: 
      break     # if not, we are done, so break out of the loop! 

工人們仍然需要有有一些計時代碼,否則你面臨的競爭條件,其中一個工人可能會消耗兩個單發下來隊列中a值批量來自主流程。如果對f的某些調用比其他調用慢得多(如果涉及從網上下載東西的話可能很可能會發生這種情況)。

+0

感謝您的提示。事實證明,我可以擁有多個活動令牌,所以第一個解決方案可以工作! – Kevin 2014-10-04 18:34:43