0

我有這個數組定義如下更新經由Scala的並行類別陣列

var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]] 

現在HashMap中,我有300個元素

val max_length = 300 
val columnArray = (0 until max_length).toParArray 
import scala.collection.parallel.ForkJoinTaskSupport 
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100)) 
columnArray foreach(i => { 
    // Do Some Computation and get a HashMap 
    var distinctElementsMap: HashMap[String, Int] = //Some Value 
    //This line might result in Concurrent Access Exception 
    distinctElementsDefinitionMap.update(i, distinctElementsMap) 
}) 

的並行集合我現在運行計算密集任務在上面定義的columnArrayforeach循環內。 計算完成後,我希望每個線程更新distinctElementsDefinitionMap陣列的特定條目。 每個線程只會更新特定的索引值,對於執行它的線程來說是唯一的。 我想知道這個數組條目的更新是否安全,並且可能同時寫入多個線程? 如果不是有​​這樣做的方式,所以它是線程安全的? 謝謝!

更新: 看來這並不是安全的做法。我得到一個java.util.ConcurrentModificationException 關於如何在使用並行集合時避免這種情況的任何提示。

+0

你在濫用平行集合 - 它不是一個時髦的普通線程池,而是將處理切換到聰明的池(工作偷ftw!),並避免使用副作用,然後使用處理結果(可能以單線程方式)。再一次,它是一個**平行**集合,而不是**併發**。也許你可以給我們一個你想要歸檔的東西的更大圖景? –

+0

我完全同意,我知道我在做什麼並不是最理想的方式,甚至是一個好方法。但我僅僅是斯卡拉的初學者,仍然在尋找方向。但我需要一個平行循環,這是我想到的唯一方法。 道歉的基本方法! – MV23

+0

不用擔心,但不太清楚爲什麼在完成每項任務後需要更新地圖中的條目。如果你澄清一下,也許我們可以想出另一種習慣解決方案。 –

回答

0

使用.groupBy操作,據我判斷it is parallelized(不像其他一些方法,如.sorted

case class Row(a: String, b: String, c: String) 
val data = Vector(
    Row("foo", "", ""), 
    Row("bar", "", ""), 
    Row("foo", "", "") 
) 

data.par.groupBy(x => x.a).seq 
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,))) 

希望你有這個想法。

或者,如果你的RAM允許你並行處理每一列而不是行,它必須比當前的方法更有效率(更少的爭用)。

val columnsCount = 3 // 300 in your case 
Vector.range(0, columnsCount).par.map { column => 
    data.groupBy(row => row(column)) 
}.seq 

雖然即使單列(8M行可能會相當多),您可能仍會遇到內存問題。