2014-03-31 78 views
1

我的代碼Scala的快速方法是相同的:並行採集

def iterate(prev: Vector[Int], acc: Int): Vector[Int] = { 
    val next = (for { i <- 1.to(1000000) } 
    yield (prev(Random.nextInt(i)))).toVector 

    if (acc < 20) iterate(next, acc + 1) 
    else next 
} 
iterate(1.to(1000000).toVector, 1) 

對於大量的迭代,它確實對集合的操作,併產生價值。在迭代結束時,它將所有內容都轉換爲矢量。最後,它繼續進行下一次遞歸自我調用,但是直到它完成所有迭代後才能繼續進行。遞歸自我呼叫的數量非常小。

我想將其並列化,所以我嘗試在1.to(1000000)範圍內使用.par。這使用了8個進程而不是1個,結果只有兩倍! .toParArray只比.par略快。有人告訴我,如果我使用了不同的東西(比如ThreadPool),速度會更快 - 這是有道理的,因爲所有的時間都花在構建next上,我假設將不同進程的輸出連接到共享內存不會導致即使對於非常大的輸出也是巨大的減速(這是一個關鍵假設,可能是錯誤的)。我該怎麼做?如果你提供的代碼,將我給出的代碼進行並列化就足夠了。

請注意,我給出的代碼不是我的實際代碼。我的實際代碼更長更復雜(Held-Karp算法用於約束,BitSets和更多東西的TSP),唯一顯着的區別是在我的代碼中,prev的類型是ParMap,而不是Vector

編輯,額外的信息:ParMap在我可以處理的最大樣本量的最差迭代中有350k個元素,否則它通常是5k-200k(在對數標度上變化)。如果它本身需要很多時間來將流程的結果連接成一個單獨的流程(我認爲這是發生了什麼),那麼我就沒有什麼可以做的了,但我相當懷疑這是事實。原來,在問題提出後

+0

這裏不太容易知道'par'出了什麼問題。 'par'確實有開銷,並不是重要元素的數量 - 大部分開銷是每個元素,因此您可以與小集合一樣支付與大元素同等的比例。您每個元素所做的工作量都會影響到您的工作量。如果每個元素所做的工作量非常小,則開銷會相應增加,而「par」可能會讓事情變得更糟。當你爲每個元素做大量的工作(更可能是一個較小的集合)'par'更有可能是有益的。 – wingedsubmariner

+0

問題可能也與您的實現本身有關。如果你與一個可變數據結構交互,你可能會有一個錯誤的共享問題(或真正的共享問題!)。如果你用鎖來觸摸任何東西,你的代碼將不會真正並行運行。你的CPU使用率是多少?代碼是否與計算機的所有內核掛鉤,還是仍然有很多空閒時間? – wingedsubmariner

+0

我基於此:http://docs.scala-lang.org/overviews/parallel-collections/performance.html CPU使用率爲每個CPU 65-90%。 我與之交互的是共享的'ParMap',它是不可變的。我擔心的是,在迭代結束時,將來自所有不同進程的輸出連接成一個大的ParMap需要太多時間。我在我的帖子中編輯了關於樣本大小的信息。 –

回答

0

實現幾個版本,

因此

import scala.collection.mutable.ArrayBuffer 
import scala.collection.parallel.mutable.ParArray 
import scala.util.Random 

// Original 
def rec0() = { 
    def iterate(prev: Vector[Int], acc: Int): Vector[Int] = { 
    val next = (for { i <- 1.to(1000000) } 
     yield (prev(Random.nextInt(i)))).toVector 

    if (acc < 20) iterate(next, acc + 1) 
    else next 
    } 
    iterate(1.to(1000000).toVector, 1) 
} 

// par map 
def rec1() = { 
    def iterate(prev: Vector[Int], acc: Int): Vector[Int] = { 
    val next = (1 to 1000000).par.map { i => prev(Random.nextInt(i)) }.toVector 

    if (acc < 20) iterate(next, acc + 1) 
    else next 
    } 
    iterate(1.to(1000000).toVector, 1) 
} 

// ParArray par map 
def rec2() = { 
    def iterate(prev: ParArray[Int], acc: Int): ParArray[Int] = { 
    val next = (1 to 1000000).par.map { i => prev(Random.nextInt(i)) }.toParArray 

    if (acc < 20) iterate(next, acc + 1) 
    else next 
    } 
    iterate((1 to 1000000).toParArray, 1).toVector 
} 

// Non-idiomatic non-parallel 
def rec3() = { 
    def iterate(prev: ArrayBuffer[Int], acc: Int): ArrayBuffer[Int] = { 

    var next = ArrayBuffer.tabulate(1000000){i => i+1} 
    var i = 0 
    while (i < 1000000) { 
     next(i) = prev(Random.nextInt(i+1)) 
     i = i + 1 
    } 

    if (acc < 20) iterate(next, acc + 1) 
    else next 
    } 
    iterate(ArrayBuffer.tabulate(1000000){i => i+1}, 1).toVector 
} 

然後在平均經過時間有點測試,

def elapsed[A] (f: => A): Double = { 
    val start = System.nanoTime() 
    f 
    val stop = System.nanoTime() 
    (stop-start)*1e-6d 
} 

val times = 10 
val e0 = (1 to times).map { i => elapsed(rec0) }.sum/times 
val e1 = (1 to times).map { i => elapsed(rec1) }.sum/times 
val e2 = (1 to times).map { i => elapsed(rec2) }.sum/times 
val e3 = (1 to times).map { i => elapsed(rec3) }.sum/times 

// time in ms. 
e0: Double = 2782.341 
e1: Double = 2454.828 
e2: Double = 3455.976 
e3: Double = 1275.876 

表明非慣用的非平行版本證明在平均最快的。也許對於較大的輸入數據,平行的慣用版本可能是有益的。