2016-05-26 94 views
1

1)如何使用供應商(supplier)並行創建N個大小的N個值流,同時確保向供應商提供不超過N個調用?我需要這個,因爲我有一個昂貴的supplier.get()操作供應商。限制無限並行流

2)對我的問題Streams.generate(supplier).limit(N)的'顯而易見的'答案不起作用,並且通常會導致超過N個呼叫提供給供應商。爲什麼是這樣?

由於事實「證據」是Streams.generate(supplier).limit(N)導致N多調用supplier.get(),考慮下面的代碼:

public class MWE { 
    static final int N_ELEMENTS=100000; 
    static Supplier<IntSupplier> mySupplier =() -> new IntSupplier() { 
     AtomicInteger ai = new AtomicInteger(-1); 
     @Override 
     public int getAsInt() { 
      return ai.incrementAndGet(); 
     } 
    }; 
    public static void main(String[] args) { 
     int[] a = IntStream.generate(mySupplier.get()).limit(N_ELEMENTS).toArray(); 
     int[] b = IntStream.generate(mySupplier.get()).parallel().limit(N_ELEMENTS).toArray(); 
    } 
} 

a等於[0, 1, ..., N_ELEMENTS-1]如預期,但違背了你所期望的b什麼不包含與a相同的元素。相反,b通常包含大於或等於N_ELEMENTS的元素,這表示多於N_ELEMENTS給供應商的呼叫數量。

另一個例子是Streams.generate(new Random(0)::nextDouble()).limit(5)並不總是生成相同的一組數字。

+0

b含有大於或等於N_元件?你有多少個核心? –

+2

你有沒有試過把'.parallel()'調用放在'limit'而不是之前? –

+1

我認爲@LouisWasserman(理論上)並不重要。 – Tunaki

回答

0

調用.limit()不能保證導致由供應商產生的前N個元素的流,因爲Stream.generate()創建一個unordered流,這讓limit()自由地在保持流的是什麼「部分」作出決定。實際上,引用「前N個元素」或「流的第一個部分」在語義上甚至不是語義上的,因爲流是無序的。這種行爲在API文檔中有明確的規定;非常感謝所有向我指出這一點的人!

既然問了這個問題,我對自己的問題提出了兩個解決方案。我感謝Tagir讓我走向正確的方向。

解決方案1:誤用IntStream.range()

比是絕對必要創建無序的,尺寸,平行通過使沒有更多的呼叫到供應商供應商支持的流的簡單且相當有效的方法是(MIS)使用IntStream.range()這樣的:

IntStream.range(0,N_ELEMENTS).parallel().mapToObj($ -> generator.get()) 

基本上,我們使用IntStream.range()僅創建可以並行地處理一個尺寸的流。

解決方案2:自定義spliterator

因爲我們從來沒有真正使用由IntStream.range()創建的流內的整數,這似乎是我們可以做到通過創建自定義Spliterator稍好:

final class SizedSuppliedSpliterator<T> implements Spliterator<T> { 
    private int remaining; 

    private final Supplier<T> supplier; 

    private SizedSuppliedSpliterator(Supplier<T> supplier, int remaining) { 
     this.remaining = remaining; 
     this.supplier = supplier; 
    } 

    static <T> SizedSuppliedSpliterator of(Supplier<T> supplier, int limit) { 
     return new SizedSuppliedSpliterator(supplier, limit); 
    } 

    @Override 
    public boolean tryAdvance(final Consumer<? super T> consumer) { 
     Objects.requireNonNull(consumer); 
     if (remaining > 0) { 
      remaining--; 
      final T supplied = supplier.get(); 
      consumer.accept(supplied); 
      return true; 
     } 
     return false; 
    } 

    @Override 
    public void forEachRemaining(final Consumer<? super T> consumer) { 
     while (remaining > 0) { 
      consumer.accept(supplier.get()); 
      remaining--; 
     } 
    } 

    @Override 
    public SizedSuppliedSpliterator<T> trySplit() { 
     int split = (int)remaining/2; 
     remaining -= split; 
     return new SizedSuppliedSpliterator<>(supplier, split); 
    } 

    @Override 
    public long estimateSize() { 
     return remaining; 
    } 

    @Override 
    public int characteristics() { 
     return SIZED | SUBSIZED | IMMUTABLE; 
    } 
} 

我們可以使用這個分割器來創建流如下:

StreamSupport.stream(SizedSuppliedSpliterator.of(supplier, N_ELEMENTS), true) 

當然,計算幾個整數並不昂貴,而且我也沒有注意到甚至測量出解決方案1的任何性能改進。

4

流API不保證IntStream.generate()將調用生成器指定的次數。此次調用並不尊重排序。

如果您實際上需要並行數量不斷增加的數字,那麼使用IntStream.range(0, N_ELEMENTS).parallel()會更好。這不僅確保您實際上擁有從0N_ELEMENTS-1的所有號碼,而且可以大大減少爭用和保證訂單。如果您需要生成更復雜的東西,請考慮使用自定義源定義您自己的Spliterator類。

請注意,建議的IntStream.iterate解決方案可能不會並行化,因爲它是按順序排列的源。

+1

「*此次調用不尊重排序*」是一個誤導性句子。這聽起來像是有不受尊重的東西,但實際上,'Stream.generate'根據定義生成* unordered *流。由於溪流沒有秩序,因此沒有任何不尊重的地方。這裏的問題是有狀態的'供應商'。 – Holger

+0

我現在意識到generate()會生成一個無序的流。但是,我實際上並不需要越來越多的數字 - 越來越多的數字只是爲了說明問題。實際上我需要需要很長時間才能構建的對象流。我將研究Spliterator課程。 – Semafoor

+0

@Semafoor,或許你應該問一下你的實際問題,而不是試圖發明一個新的問題,哪個解決方案不適合你。 –