2013-08-20 50 views
2

我的問題與Count occurrences of each element in a List[List[T]] in Scala非常相似,只是我想要一個涉及parallel collections的高效解決方案。Scala並行集合中每個項目的發生次數

具體而言,我有個整數的一個大的(〜10^7)矢量的短vec(〜10)列表和我想獲得每個詮釋xx發生時,例如作爲Map[Int,Int]的次數。不同整數的數量是10^6的數量級。由於需要完成的機器具有相當數量的內存(150GB)和內核數量(> 100),因此平行集合似乎是一個不錯的選擇。下面的代碼是一個好方法嗎?

val flatpvec = vec.par.flatten 
val flatvec = flatpvec.seq 
val unique = flatpvec.distinct 
val counts = unique map (x => (x -> flatvec.count(_ == x))) 
counts.toMap 

還是有更好的解決方案?如果你想知道的.SEQ轉化:由於某種原因,下面的代碼似乎並沒有終止,即使是小例子:

val flatpvec = vec.par.flatten 
val unique = flatpvec.distinct 
val counts = unique map (x => (x -> flatpvec.count(_ == x))) 
counts.toMap 

回答

3

此做一些事情。 aggregate就像fold除了你也結合順序摺疊的結果。

更新:在.par.groupBy中有開銷並不奇怪,但我對常數因素感到驚訝。通過這些數字,你永遠不會那樣計數。另外,我不得不通過記憶的方式。

有趣的技術,用於構建結果圖is described in this paper鏈接從the overview。 (它聰明地保存了中間結果,然後在最後並行合併它們)。

但是,如果你真的想要的只是一個計數,那麼複製groupBy的中間結果會很昂貴。

這些數字比較順序groupBy,並行,最後是aggregate

[email protected]:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test 
GroupBy: Starting... 
Finished in 12695 
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000)) 
Par GroupBy: Starting... 
Finished in 51481 
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000)) 
Aggregate: Starting... 
Finished in 2672 
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000)) 

測試代碼中沒有什麼神奇的東西。

import collection.GenTraversableOnce 
import collection.concurrent.TrieMap 
import collection.mutable 

import concurrent.duration._ 

trait Timed { 
    def now = System.nanoTime 
    def timed[A](op: =>A): A = { 
    val start = now 
    val res = op 
    val end = now 
    val lapsed = (end - start).nanos.toMillis 
    Console println s"Finished in $lapsed" 
    res 
    } 
    def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = { 
    Console println s"$title: Starting..." 
    val res = timed(op) 
    //val showable = res.toIterator.min //(res.toIterator take 10).toList 
    val showable = res.toList.sorted take 10 
    Console println s"$title: $showable" 
    } 
} 

它生成一些感興趣的隨機數據。

object Test extends App with Timed { 

    val upto = math.pow(10,6).toInt 
    val ran = new java.util.Random 
    val ten = (1 to 10).toList 
    val maxSamples = 1000 
    // samples of ten random numbers in the desired range 
    val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto)) 
    // pick a sample at random 
    def anyten = samples(ran nextInt maxSamples) 
    def mag = 7 
    val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten) 

的順序操作和aggregate組合操作是從任務調用,並將結果指定給易失性變種。

def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int] 
    def so(m: mutable.Map[Int,Int], is: List[Int]) = { 
    for (i <- is) { 
     val v = m.getOrElse(i, 0) 
     m(i) = v + 1 
    } 
    m 
    } 
    def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = { 
    for ((i, count) <- n) { 
     val v = m.getOrElse(i, 0) 
     m(i) = v + count 
    } 
    m 
    } 
    showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) }) 
    showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) }) 
    showtime("Aggregate", data.par.aggregate(z)(so, co)) 
} 
+0

有趣,但是不會導致爲'data'中的每個元素創建一個Map嗎? – mitchus

+0

@mitchus z更有意義,因爲z是可變的,所以每個順序操作一個映射,這是一個單線程任務,但我懶得解決它。我會把它放在我的身上。 –

+0

@mitchus更新爲使用可變結果,這只是起作用。看到令人驚訝的數字。或者,也許他們並不奇怪。 –

2

如果你想使用並行收集和Scala的標準工具,你可以這樣做。集團您的收藏由身份,然後將其映射到(價值,計數):

scala> val longList = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7) 
longList: List[Int] = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)                        

scala> longList.par.groupBy(x => x) 
res0: scala.collection.parallel.immutable.ParMap[Int,scala.collection.parallel.immutable.ParSeq[Int]] = ParMap(5 -> ParVector(5), 1 -> ParVector(1, 1), 2 -> ParVector(2, 2, 2), 7 -> ParVector(7, 7, 7), 3 -> ParVector(3, 3, 3), 4 -> ParVector(4))                  

scala> longList.par.groupBy(x => x).map(x => (x._1, x._2.size)) 
res1: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)           

甚至像pagoda_5b更好的意見建議:

scala> longList.par.groupBy(identity).mapValues(_.size) 
res1: scala.collection.parallel.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1) 
+0

好主意,我會試試。 – mitchus

+2

您可以使用'identity(_)'函數作爲'groupBy'的參數而不是'x => x'的一些小改進。你也可以用'mapValues(_「)來映射分組'Map'的值。大小)' –

+0

偉大的建議,pagoda_5b。我將它添加到答案中。 :) –

相關問題