2013-10-08 36 views
1

我有一個大批量的並行計算,我在scala中使用了一個並行地圖​​。我注意到,隨着工作人員的完成,CPU使用率似乎逐漸下降。這一切都歸結到一個電話到呼叫Map對象覆蓋scala中的默認並行集合行爲

scala.collection.parallel.thresholdFromSize(length, tasksupport.parallelismLevel) 

看代碼裏面,我看到:

def thresholdFromSize(sz: Int, parallelismLevel: Int) = { 
    val p = parallelismLevel 
    if (p > 1) 1 + sz/(8 * p) 
    else sz 
} 

我的計算運用了大量核大,現在我明白了爲什麼..

thesholdFromSize(1000000,24) = 5209 
thesholdFromSize(1000000,4) = 31251 

如果我有長度1000000 24 CPU的數組將下降到5209元一路分區。如果我將同一陣列傳遞到4 CPU機器上的並行集合,它將停止分區爲31251個元素。

應該指出,我的計算的運行時間不統一。每個單元的運行時間可以高達0.1秒。在31251項,即3100秒,或52分鐘的時間裏,其他工人可以加入並抓取工作,但不是。我在觀察並行計算期間的CPU利用率時觀察到了這種行爲。很顯然,我很想在大型機器上運行,但這並非總是可能。

我的問題是這樣的:有什麼辦法來影響並行集合,使其更適合我的問題的閾值數量更小?我能想到的唯一事情就是自己實現類'Map',但這看起來像是一個非常不雅的解決方案。

回答

0

想要閱讀Configuring Scala parallel collections。特別是,你可能需要實現一個TaskSupport特性。

+0

讓我們假設我已經閱讀過(這是什麼導致我開始挖掘代碼)。我幾乎沒有發現如何實際執行TaskSupport trait – fbl

+0

@fbl啊。那麼我可能無法提供更多的幫助: - \ –

0

我認爲,所有你需要做的是這樣的:

yourCollection.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(24)) 

parallelism參數默認爲你的CPU核心數量,但是你可以重寫它像上面。這也顯示在ParIterableLike的來源中。

+0

但是這不會產生24名工人嗎?我的機器上只有4個內核,所以我只需要4個工作人員。問題在於,對於非常大的輸入大小,一些工作人員完成速度顯着快於其他人,這似乎受原始問題中提到的thresholdFromSize()函數的支配。 – fbl

0

0.1秒是足夠大的時間足以分開處理它。將每個單元(或10個單元)的處理封裝在單獨的Runnable中,並將其全部提交給FixedThreadPool。另一種方法是使用ForkJoinPool - 然後更容易控制所有計算的結束。