2016-03-02 32 views
5

我有一個重要的數據集,並且想要調用緩慢而乾淨的方法,並且比第一個調用結果有副作用的快速方法要快。我對中間結果不感興趣,所以我不想收集它們。在並行流上調用順序使所有以前的操作順序

明顯的解決方案是創建並行流,進行慢速呼叫,再次使流順序,並進行快速調用。問題是,在單線程中執行的所有代碼都沒有實際的並行性。

示例代碼:

@Test 
public void testParallelStream() throws ExecutionException, InterruptedException 
{ 
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); 
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed() 
      .parallel() 
      .map(this::slowOperation) 
      .sequential() 
      .map(Function.identity())//some fast operation, but must be in single thread 
      .collect(Collectors.toSet()) 
    ).get(); 
    System.out.println(threads); 
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size()); 
} 

private String slowOperation(int value) 
{ 
    try 
    { 
     Thread.sleep(100); 
    } 
    catch (InterruptedException e) 
    { 
     e.printStackTrace(); 
    } 
    return Thread.currentThread().getName(); 
} 

如果刪除sequential,代碼執行如預期的,但,很明顯,不平行的操作將在多個線程中調用。

您能否推薦一些有關此類行爲的參考文獻,或者可能採用某種方法來避免臨時收集?

回答

5

切換從parallel()的流sequential()曾在初始流API的設計,但造成了許多問題,最終的落實是changed,所以它只是變成並行標誌和關閉整個管道。當前文檔確實是模糊的,但它是在Java-9改進:

流管道取決於在其上調用的終端操作的流的模式順序地或並行地執行。可以使用BaseStream.isParallel()方法確定流的順序或並行模式,並且可以使用BaseStream.sequential()BaseStream.parallel()操作修改流的模式。最新的順序或並行模式設置適用於整個流管線的執行。

至於你的問題,你可以收集到一切中間List並開始新的順序管道:

new Random().ints(100).boxed() 
     .parallel() 
     .map(this::slowOperation) 
     .collect(Collectors.toList()) 
     // Start new stream here 
     .stream() 
     .map(Function.identity())//some fast operation, but must be in single thread 
     .collect(Collectors.toSet()); 
+1

您引用的句子在Java 8版本中完全相同,可以在類文檔的最後一個段落中找到。通常,您可以在[包文檔](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps)中找到更多信息(請參閱「並行性」 )而不是[特定方法](https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#parallel--),而不僅限於並行/順序模式(例如與減少量相比)。 – Holger

+0

斑點!我知道它是[更新](http://hg.openjdk.java.net/jdk9/dev/jdk/rev/d52b2d49bf04)(我甚至參加了討論並[相信](http://mail.openjdk。 java.net/pipermail/core-libs-dev/2015-August/034773.html)Stuart爲'concat'添加一個特別的註釋),但由於某種原因找到了錯誤的地方。發佈編輯。 –

1

在當前實現中,Stream可以全部並行或全部依次。雖然Javadoc不明確這一點,它可能在未來發生變化,但它確實表示這是可能的。

S並聯()

返回的等效流平行。可能會返回自己,因爲流已經是平行的,或者因爲基礎流狀態被修改爲平行。

如果你需要的功能是單線程,我建議你使用鎖定或同步塊/方法。

+0

感謝您的回覆,但同步方法成爲瓶頸和中間收集工作更快(確認由江鈴控股)。在這種特殊情況下,我對性能和內存更感興趣。 – the20login