1

我想用許多不同的參數組合來集成一個微分方程組,並存儲變量的最終值屬於一組參數。因此,我實現了一個簡單的for循環,其中創建了隨機初始條件和參數組合,整合了系統並將感興趣的值存儲在各個陣列中。 因爲我打算爲一個相當複雜的系統(這裏我只使用一個玩具系統進行說明)的許多參數組合來做到這一點,而這個系統也可能變得僵硬,我想通過並行化模擬來加速使用Python的「多處理「模塊。Python的多處理:爲幾組參數加速for循環,「apply」與「apply_async」

但是,當我運行模擬時,for循環總是比其並行版本更快。到目前爲止我發現的比for循環更快的唯一方法是使用「apply_async」而不是「apply」。對於10個不同的參數組合,我得到例如以下輸出(使用代碼從下面):

The for loop took 0.11986207962 seconds! 
[ 41.75971761 48.06034375 38.74134139 25.6022232 46.48436046 
    46.34952734 50.9073202 48.26035086 50.05026187 41.79483135] 
Using apply took 0.180637836456 seconds! 
41.7597176061 
48.0603437545 
38.7413413879 
25.6022231983 
46.4843604574 
46.3495273394 
50.9073202011 
48.2603508573 
50.0502618731 
41.7948313502 
Using apply_async took 0.000414133071899 seconds! 
41.7597176061 
48.0603437545 
38.7413413879 
25.6022231983 
46.4843604574 
46.3495273394 
50.9073202011 
48.2603508573 
50.0502618731 
41.7948313502 

雖然在該示例中,結果的順序是「應用」相同,並且「apply_async」,這似乎一般來說不是真實的。所以,我想使用「apply_async」,因爲它更快,但在這種情況下,我不知道如何將模擬的結果與我用於各自模擬的參數/初始條件相匹配。因此

我的問題是:

1)爲什麼是「應用」比簡單的for循環在這種情況下,多慢些?

2)當我使用「apply_async」而不是「apply」時,並行化版本變得比for循環快得多,但是我怎樣才能將仿真結果與我在各自仿真中使用的參數相匹配?

3)在這種情況下,「apply」和「apply_async」的結果具有相同的順序。這是爲什麼?巧合?

我的代碼可以發現如下:

from pylab import * 
import multiprocessing as mp 
from scipy.integrate import odeint 
import time 

#my system of differential equations 
def myODE (yn,tvec,allpara): 

    (x, y, z) = yn 

    a, b = allpara['para'] 

    dx = -x + a*y + x*x*y 
    dy = b - a*y - x*x*y 
    dz = x*y 

    return (dx, dy, dz) 

#for reproducibility  
seed(0) 

#time settings for integration 
dt = 0.01 
tmax = 50 
tval = arange(0,tmax,dt) 

numVar = 3 #number of variables (x, y, z) 
numPar = 2 #number of parameters (a, b) 
numComb = 10 #number of parameter combinations 

INIT = zeros((numComb,numVar)) #initial conditions will be stored here 
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here 
RES = zeros(numComb) #z(tmax) will be stored here 

tic = time.time() 

for combi in range(numComb): 

    INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0 

    PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly 

    allpara = {'para': PARA[combi,:]} 

    results = transpose(odeint(myODE, INIT[combi,:], tval, args=(allpara,))) #integrate system 

    RES[combi] = results[numVar - 1][-1] #store z 

    #INIT[combi,:] = results[:,-1] #update initial conditions 
    #INIT[combi,-1] = 0 #set z to 0 

toc = time.time() 

print 'The for loop took ', toc-tic, 'seconds!' 

print RES 

#function for the multi-processing part 
def runMyODE(yn,tvec,allpara): 

    return transpose(odeint(myODE, yn, tvec, args=(allpara,))) 

tic = time.time() 

pool = mp.Pool(processes=4) 
results = [pool.apply(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)] 

toc = time.time() 

print 'Using apply took ', toc-tic, 'seconds!' 

for sol in range(numComb): 
    print results[sol][2,-1] #print final value of z 

tic = time.time()  
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]  
toc = time.time() 
print 'Using apply_async took ', toc-tic, 'seconds!' 

for sol in range(numComb): 
    print resultsAsync[sol].get()[2,-1] #print final value of z 
+0

正如指出的當前的答案,您的異步應用程序是假的,因爲您在打印出時間之前沒有讓工作完成。異步性的要點是避免在工作完成時阻塞調用線程。也就是說,並行循環處理獲得實際性能收益的第一條原則是確保每個線程/任務都有足夠的工作要做。與單線程for循環相比,執行這些任務的開銷調度線程更多,因此您必須通過在每次迭代中執行更多的操作來彌補開銷。 –

+0

例如,如果要並行化類似計算頂點法線的內容,如果循環的每次迭代只計算一個頂點的法線,則會損害性能。你希望每次迭代計算數千個正常值來彌補該線程調度開銷。以這種方式編寫代碼,以便每個線程都有非常豐富的工作要做,並且您將開始看到加速功能開始與您的硬件功能越來越成比例。 –

+0

感謝您的意見,艾克!我的 - 顯然 - 天真的想法是,如果我有一定數量的參數組合,並且在for循環中,它需要一個T來集成它們,如果我使用並行方法,則最終的時間爲〜T/4有4個核心。你是對的,在這個特殊的例子中,計算任務不是那麼「有趣」。然而,存在隨機參數導致系統僵硬的系統,這導致了較長的積分時間。在等待這樣一個僵硬的系統集成的同時,我想同時集成幾個非僵硬的系統。 – Cleb

回答

1

注意這樣一個事實,你apply_async是289倍的速度那麼for循環是有點懷疑!現在,您可以保證按照提交的順序獲得結果,即使這不是您想要的最大並行性。

apply_async啓動一個任務,它不會等到它完成; .get()這樣做。所以這個:

tic = time.time()  
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]  
toc = time.time() 

是不是一個非常公平的測量;你已經開始了所有的任務,但他們不一定完成。另一方面,一旦你.get()結果,你知道任務已經完成並且你有答案;這樣算下來,

for sol in range(numComb): 
    print resultsAsync[sol].get()[2,-1] #print final value of z 

意味着確保你有才能的結果(因爲你爲了和獲得()荷蘭國際集團他們經歷的ApplyResult對象);但您可能希望在準備就緒後立即獲得結果,而不是一次一個阻止等待步驟。但這意味着你需要用各種參數來標記結果。

您可以使用回調將結果保存,一旦任務完成,並與結果一起返回的參數,允許完全異步的回報:

def runMyODE(yn,tvec,allpara): 
    return allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,))) 

asyncResults = [] 

def saveResult(result): 
    asyncResults.append((result[0], result[1][2,-1])) 

tic = time.time() 
for combi in range(numComb): 
    pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]}), callback=saveResult) 
pool.close() 
pool.join() 
toc = time.time() 

print 'Using apply_async took ', toc-tic, 'seconds!' 

for res in asyncResults: 
    print res[0], res[1] 

爲您提供了一個更合理的時間;結果仍然幾乎總是爲了因任務需要的時間非常相似金額:

Using apply took 0.0847041606903 seconds! 
[ 6.02763376 5.44883183] 41.7597176061 
[ 4.37587211 8.91773001] 48.0603437545 
[ 7.91725038 5.2889492 ] 38.7413413879 
[ 0.71036058 0.871293 ] 25.6022231983 
[ 7.78156751 8.70012148] 46.4843604574 
[ 4.61479362 7.80529176] 46.3495273394 
[ 1.43353287 9.44668917] 50.9073202011 
[ 2.64555612 7.74233689] 48.2603508573 
[ 0.187898 6.17635497] 50.0502618731 
[ 9.43748079 6.81820299] 41.7948313502 
Using apply_async took 0.0259671211243 seconds! 
[ 4.37587211 8.91773001] 48.0603437545 
[ 0.71036058 0.871293 ] 25.6022231983 
[ 6.02763376 5.44883183] 41.7597176061 
[ 7.91725038 5.2889492 ] 38.7413413879 
[ 7.78156751 8.70012148] 46.4843604574 
[ 4.61479362 7.80529176] 46.3495273394 
[ 1.43353287 9.44668917] 50.9073202011 
[ 2.64555612 7.74233689] 48.2603508573 
[ 0.187898 6.17635497] 50.0502618731 
[ 9.43748079 6.81820299] 41.7948313502 

注意,而不是遍歷應用,你也可以使用地圖:

pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, para=PARA[combi,:]), range(numComb), callback=saveResult) 
+0

非常感謝您的詳細回覆!我會測試它。 – Cleb

+0

我沒有得到這個「map_async」行來運行。我所做的就是替換範圍內的組合(numComb): pool.apply_async(runMyODE,args =(INIT [combi,:],tval,{'para':PARA [combi ,:}])), callback = saveResult)「由」pool.map_async「中的行打印,但沒有打印任何內容。有什麼建議麼?你可以編輯這個例子的代碼嗎?!謝謝!此外,pool.close()和pool.join()需要還是「唯一」的好樣式? – Cleb

+0

@Cleb:在一些相當近期的Python版本中,看起來像map_async()中存在一個bug,其中回調參數被忽略(這會導致結果列表爲空,這就是你所看到的)。最新(或足夠老!)版本應該工作。至於close()和join(),這就是你想要整理的東西,這意味着地圖也會被完成,但是你可以簡單地在所有結果上調用wait(),或者(如果map_async起作用)從map_async獲取一個返回值並返回。這也值得嘗試map()。 –