2017-07-19 35 views
1

我有加載數據,並通過次循環例如一個功能多處理過程中間輸出

def calculate_profit(account): 
    account_data = load(account) #very expensive operation 
    for day in account_data.days: 
     print(account_data.get(day).profit) 

因爲數據的加載是昂貴是有道理的使用JOBLIB /多到做這樣的事情:

arr = [account1, account2, account3, ...] 
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr)) 

不過,我還有一個昂貴的功能,我想申請上calculate_profit函數的中間結果。例如,假設總結所有利潤並將其處理/張貼到網站/等是一項昂貴的操作。我還需要前一天的利潤來計算此功能中的利潤變化。

def expensive_sum(prev_day_profits, *account_profits): 
    total_profit_today = sum(account_profits) 
    profit_difference = total_profit_today - prev_day_profits 

    #some other expensive operation 
    #more expensive operations 

因此,我想

  1. 運行在並行多處理過程(以減輕負載的負載中的所有昂貴帳戶數據)
  2. 一旦每個多處理過程擊中預定點(例如完成了循環的一次迭代),返回這些中間值到另一個功能(expensive_sum)工藝 - 假設每個個體多處理過程不能繼續下去,直到expensive_sum回報
  3. 不過,我想保持多進程活着,這樣我就不必重新初始化它們(減少開銷)

有沒有辦法做到這一點?

+0

並阻止這一過程的持續,直到隊列的回報? – Michael

+0

我們討論'load()'和'expensive_sum()'的數據量是多少?因爲如果你要傳遞大量數據,你可能會失去多處理的好處,這是因爲Python在進程之間交換數據時所做的酸洗/取消芭蕾舞。 – zwer

+0

各個過程只是'put'值....隊列不返回 – vks

回答

1
from multiprocessing import Manager 
queue = manager.Queue() 

一旦每個多處理過程命中預定點 做

queue.put(item) 

與此同時,其他昂貴的功能確實

queue.get(item) ==> blocking call for get 

昂貴的功能等待上get和向前走當它得到一個值處理它ð再次等待上get

+0

感謝 - 你將如何貫徹落實「堵」要求得到什麼? – Michael

+0

@邁克爾其默認攔截...如果你想非阻塞你會使用'get_nowait()' – vks

+0

酷的感謝!對不起另一個問題 - 你是否從另一個進程調用queue.get?你將如何初始化它?例如,如果我有運行queue.put 10個進程(項目[0]),我該如何確保他們不繼續queue.put(項目[1])前處理(項[0],.. 。,item [1])結束? – Michael