2017-01-14 46 views
4

我已經看到了一些用於Java 8流API的takeWhile實現,但它們似乎都將流轉換爲非並行流。例如this之一:如何在Java 8中爲Stream API實現並行支持takeWhile?

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) { 
    return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) { 
    boolean stillGoing = true; 
    @Override public boolean tryAdvance(Consumer<? super T> consumer) { 
     if (stillGoing) { 
     boolean hadNext = splitr.tryAdvance(elem -> { 
      if (predicate.test(elem)) { 
      consumer.accept(elem); 
      } else { 
      stillGoing = false; 
      } 
     }); 
     return hadNext && stillGoing; 
     } 
     return false; 
    } 
    }; 
} 

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { 
    return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); 
} 

這裏StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);轉動傳遞給takeWhile成連續流的流。是否有人知道支持並行流的實現,或者我如何修改此代碼以使其維護/支持並行流?

+3

你不能,真的。對不起,但你必須解決這個問題。這實際上是一種固有的順序操作。你可以使用默認的非常有限的並行性,這對所有的東西都適用,這就是你在流中使用'.parallel'所得到的結果,但是你可以得到。 –

+0

爲了在這裏提取任何真正的並行性,謂詞必須非常昂貴(例如,試圖分解非常大的數字)。這不是不可能的,但它不太可能。 –

回答

2

如果源被稱爲是無序的,那麼下面的實現應該工作:

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { 
    return StreamSupport.stream(UnorderedTakeWhileSpliterator<>(stream.spliterator(), predicate), stream.isParallel()); 
} 

有序實施將作爲更棘手:

static final class UnorderedTakeWhileSpliterator<T> implements Spliterator<T>, Consumer<T>, Cloneable { 
    private final Predicate<? super T> predicate; 
    private final AtomicBoolean checked = new AtomicBoolean(); 
    private Spliterator<T> source; 
    private T cur; 

    UnorderedTakeWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) { 
     this.predicate = predicate; 
     this.source = source; 
    } 

    @Override 
    public void accept(T t) { 
     this.cur = t; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super T> action) { 
     if (!checked.get() && source.tryAdvance(this)) { 
      if (predicate.test(cur)) { 
       action.accept(cur); 
       return true; 
      } else { 
       checked.set(true); 
      } 
     } 
     return false; 
    } 

    @Override 
    public Spliterator<T> trySplit() { 
     Spliterator<T> prefix = source.trySplit(); 
     if(prefix == null) { 
      return null; 
     } 
     if(checked.get()) { 
      return Spliterators.emptySpliterator(); 
     } 
     UnorderedTakeWhileSpliterator<T> clone; 
     try { 
      clone = (UnorderedTakeWhileSpliterator<T>) clone(); 
     } catch (CloneNotSupportedException e) { 
      throw new InternalError(e); 
     } 
     clone.source = prefix; 
     return clone; 
    } 

    @Override 
    public long estimateSize() { 
     return source.estimateSize(); 
    } 

    @Override 
    public int characteristics() { 
     return source.characteristics() & (DISTINCT | SORTED | NONNULL); 
    } 

    @Override 
    public Comparator<? super T> getComparator() { 
     return source.getComparator(); 
    } 
} 

以下方法創建流它應該緩衝非前綴項並將取消傳播到後綴。類似這樣的東西在JDK-9中實現(不是作爲分割器,而是作爲普通的流操作),但我懷疑即使是這種棘手的實現在很多情況下都勝過順序流。

+0

謝謝,但我似乎沒有名爲UnorderedTDOfRef的類?我在哪裏可以找到這個? – Johan

+1

@Johan,修好了,對不起。 –

+0

它的工作原理:)脫離主題,但這是什麼,你將包含在StreamEx庫?那將是真棒! – Johan