2014-03-12 28 views
30

見簡單的例子,下面計數每個單詞的出現次數的在列表中的號碼:並行流,收藏家和線程安全

Stream<String> words = Stream.of("a", "b", "a", "c"); 
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1, 
                 (i, j) -> i + j)); 

最後,wordsCount{a=2, b=1, c=1}

但我流是非常大的,我想parallelise的工作,所以我寫:

Map<String, Integer> wordsCount = words.parallel() 
             .collect(toMap(s -> s, s -> 1, 
                 (i, j) -> i + j)); 

但是我注意到,wordsCount是一個簡單的HashMap所以我不知道如果我需要明確要求同時地圖,以保證線程安全:

Map<String, Integer> wordsCount = words.parallel() 
             .collect(toConcurrentMap(s -> s, s -> 1, 
                   (i, j) -> i + j)); 

可以非併發收集器安全地與並行流使用,或者我應該只使用併發版本從並行流收集什麼時候?

回答

32

非併發收集器是否可以安全地與並行流一起使用,還是應該只在從並行流中收集時使用併發版本?

在並行流的collect操作中使用非併發收集器是安全的。

Collector接口的specification,與半打要點的部分,是這樣的:

對於非併發收集器的,任何結果從結果供應商,蓄電池,或組合函數返回必須串行限制線程。這使收集能夠並行發生,而收集器不需要實施任何額外的同步。 Reduce實現必須管理輸入是否正確分區,分區是否被隔離處理,並且僅在累積完成後纔會發生組合。

這意味着由Collectors類提供的各種實施方式可以用並行流一起使用,即使其中一些實施方式中可能不是同時的收集器。這也適用於您可能實現的任何您自己的非併發收集器。如果您的收集器不干擾流源,不受副作用,獨立訂單等的影響,它們可以安全地用於並行流。

我也推薦閱讀java.util.stream的Mutable Reduction部分包文檔。在本節的中間是一個聲明爲可並行化的示例,但它會將結果收集到不是線程安全的ArrayList中。

工作方式是以並行收集器結尾的並行流確保不同線程始終在中間結果集合的不同實例上運行。這就是爲什麼收集器具有Supplier函數的原因,用於創建與線程一樣多的中間集合,因此每個線程都可以累積到自己的線程中。當中間結果被合併時,它們在線程之間安全地切換,並且在任何給定時間只有一個線程合併任何一對中間結果。

8

在並行流中使用非併發集合和非原子計數器是安全的。

如果你看一看的Stream::collect的文檔,你會發現以下段落:

reduce(Object, BinaryOperator),收集操作可無需額外的同步並行。

而對於方法Stream::reduce

雖然這可能看起來比在一個循環的運行總計簡單突變來執行聚集的更迂迴的方式,減少操作並行更優雅,而無需額外同步並大大降低數據競爭的風險。

這可能有點令人驚訝。但是,請注意並行流基於分叉加入模型。這意味着併發執行的工作原理如下:

  • 分裂序列分成兩個部分差不多大小
  • 過程中的每個部分單獨
  • 收集兩部分的結果,並結合成一個結果

在第二步中,三個步驟遞歸地應用於子序列。

一個例子應該說清楚。該

IntStream.range(0, 4) 
    .parallel() 
    .collect(Trace::new, Trace::accumulate, Trace::combine); 

跟蹤的唯一目的是記錄的構造函數和方法調用。如果執行該語句,它打印以下行:

thread: 9/operation: new 
thread: 10/operation: new 
thread: 10/operation: accumulate 
thread: 1/operation: new 
thread: 1/operation: accumulate 
thread: 1/operation: combine 
thread: 11/operation: new 
thread: 11/operation: accumulate 
thread: 9/operation: accumulate 
thread: 9/operation: combine 
thread: 9/operation: combine 

可以看到,四個跟蹤對象被創建,積累每個對象已被調用一次,結合一直使用三次將四個對象合併成一個。每個對象一次只能由一個線程訪問。這使得代碼是線程安全的,這同樣適用於方法Collectors :: toMap

16

所有收集器如果遵循規範中的規則,可以安全地並行或順序運行。並行準備是這裏設計的關鍵部分。

併發和非併發收集器之間的區別與並行化的方法有關。

一個普通的(非併發)收集器通過合併子結果來操作。因此,源代碼被分割成一堆塊,每個塊被收集到一個結果容器(如列表或地圖)中,然後將子結果合併到一個更大的結果容器中。這是安全和有序的保存,但對於某些類型的容器 - 特別是地圖 - 可能會很昂貴,因爲按鍵合併兩個地圖通常很昂貴。

併發收集器改爲創建一個結果容器,其插入操作保證是線程安全的,並將元素從多線程中發送出去。使用像ConcurrentHashMap這樣高度併發的結果容器,這種方法可能比合並普通HashMap更好。

因此,併發收集者嚴格優化其普通對象。而且他們不會沒有成本;因爲元素是從很多線程中被爆炸出來的,所以併發收集者通常不能保存遇到命令。 (但是,您通常不關心 - 創建字數統計直方圖時,您並不關心首先計算哪個「富」實例)。