2017-02-27 33 views
9

我嘗試將Java Streams的某些工作並行化。讓我們看看這個簡單的例子:Java Stream generator爲什麼無序?

Stream.generate(new Supplier<Integer>() { 
     @Override 
     public Integer get() { 
      return generateNewInteger(); 
     } 
    }) 
    .parallel() 
    .forEachOrdered(new Consumer<Integer>() { 
     @Override 
     public void accept(Integer integer) { 
      System.out.println(integer); 
     } 
    }); 

的問題是,它不叫accept方法forEachOrdered,但如果我使用forEach纔有效。我想問題是Stream.generate內部創建InfiniteSupplyingSpliterator沒有ORDERED特徵。

問題是爲什麼?我們似乎知道數據生成的順序。第二個問題是如何在並行流上執行forEachOrdered並生成流元素?

+3

它是無序的,因爲規範說明了這一點。它沒有完成的原因是它是*無限*,結合實現細節。 – Holger

+2

這可以用lambda表達式寫得更加簡潔。 –

+0

無關,但你可以做'.forEachOrdered(System.out :: println)' –

回答

10

最簡單的答案是,Stream.generate是無序的,因爲it’s specification這樣說。

這並不是說如果實現嘗試按順序處理項目,它實際上是相反的。一旦操作被定義爲無序,實現將盡可能從無序性中獲益。如果您在無序操作中遇到類似於源訂單的內容,則可能沒有辦法從無序處理中獲得好處,或者實現未使用所有機會。由於這可能會在將來的版本或替代實現中發生變化,因此如果操作已被指定爲無序,則不必依賴訂單。

Stream.generate定義爲無序後的意圖可能與訂購的Stream.iterate相比變得更清晰。傳遞給iterate的函數將接收到其之前的元素,因此元素之間存在先前後續的關係,因此是一個排序。通過Stream.generate的供應商不會收到以前的元素,換句話說,僅在考慮功能簽名時與先前的元素沒有關係。這適用於Stream.generate(() -> constant)Stream.generate(Type::new)類似用例,但Stream.generate(instance::statefulOp)類似,這似乎不是預期的主要用例。它仍然有效,如果操作是線程安全的並且您可以忍受流的無序性質。

之所以你的例子確實從來沒有取得進展是,forEachOrdered的實現實際上不考慮無序的自然,而是試圖分裂之後處理塊中相遇順序,即所有的子任務嘗試緩衝它們的元素,這樣一旦他們左邊的子任務完成,他們就可以將它們傳遞給行動。當然,緩衝和無限的資源並不能很好地協同工作,特別是因爲底層的InfiniteSupplyingSpliterator會分裂成無限的子任務。原則上,最左邊的任務可以將其元素直接提供給操作,但是這些任務似乎在隊列中的某個位置,等待被激活,這將不會發生,因爲所有工作線程都已經忙於處理其他無限子-任務。最終,如果你讓它運行足夠長的時間,整個操作將會打破OutOfMemoryError ...

相關問題