2016-02-05 70 views
6

我在想,當我創建了自己的無限流與Stream.generate怎麼流,兩者都在標準庫停止...流如何停止?

例如,當你有記錄的列表:

List<Record> records = getListWithRecords(); 
records.stream().forEach(/* do something */); 

流將不會無限且永遠運行,但當遍歷列表中的所有項目時,它將停止。但這是如何工作的? Files.lines(path)創建的數據流具有相同的功能(來源:http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/)。

第二個問題,如何用Stream.generate創建的流以同樣的方式停止呢?

回答

9

有限流根本不是通過Stream.generate創建的。

實現流的標準方法是實現Spliterator,有時使用the Iterator detour。在任何一種情況下,該實現都有報告結束的方式,例如,當Spliterator.tryAdvance返回false或其forEachRemaining方法剛剛返回時,或者在Iterator源的情況下,hasNext()返回false

A Spliterator甚至可能會在處理開始之前報告預期的元素數量。

流通過的Stream界面內的工廠方法創建的,像Stream.generate可以或者實施,由Spliterator以及或使用流實現的內部特徵,但不管它們是如何實現的,你不」不管這個實現如何改變它們的行爲,所以使這種流有限的唯一方法是將limit操作鏈接到流。

如果您想要創建一個非空的有限數據流,但不支持數組或集合,並且現有流源都不適合,則必須實現您自己的Spliteratorcreate a stream out of it。如上所述,您可以使用現有方法在Iterator之外創建Spliterator,但您應該抵制使用Iterator的誘惑,因爲它很熟悉。一個Spliterator並不難實現:

/** like {@code Stream.generate}, but with an intrinsic limit */ 
static <T> Stream<T> generate(Supplier<T> s, long count) { 
    return StreamSupport.stream(
       new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) { 
     long remaining=count; 

     public boolean tryAdvance(Consumer<? super T> action) { 
      if(remaining<=0) return false; 
      remaining--; 
      action.accept(s.get()); 
      return true; 
     } 
    }, false); 
} 

以此爲起點,您可以爲Spliterator接口,加權開發費用的default方法和潛在的性能改進,例如增加覆蓋

static <T> Stream<T> generate(Supplier<T> s, long count) { 
    return StreamSupport.stream(
       new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) { 
     long remaining=count; 

     public boolean tryAdvance(Consumer<? super T> action) { 
      if(remaining<=0) return false; 
      remaining--; 
      action.accept(s.get()); 
      return true; 
     } 

     /** May improve the performance of most non-short-circuiting operations */ 
     @Override 
     public void forEachRemaining(Consumer<? super T> action) { 
      long toGo=remaining; 
      remaining=0; 
      for(; toGo>0; toGo--) action.accept(s.get()); 
     } 
    }, false); 
} 
+1

爲什麼要避免使用迭代器來定義spliterator?我剛剛看到BufferedReader.lines()使用這種方法來創建他的有限流。 – Juru

+5

'BufferedReader.lines()'就是一個很好的例子。看看next()和['hasNext()']的實現(http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/io /BufferedReader.java?av = f#566)以及它們如何在調用之間保持狀態。相反,分割器很簡單,只需要一個方法:'tryAdvance(Consumer <?super String> c){String line = readLine(); if(line == null)返回false; c.accept(線);返回true; ''就是這樣。更容易實現(添加異常處理,仍然是代碼大小的一半),不需要包裝... – Holger

+0

該實現是線程安全的嗎?它需要成爲? – WillD

0

我已經創建了一個通用的辦法解決這個

​​

用法很簡單:

GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
    () -> rnd.nextInt(), 
    (i) -> i > 10, 
    true 
); 

Stream<Integer> ints = StreamSupport.stream(source, false); 

ints.forEach(i -> System.out.println(i));