我也想爲CompletableFutures
的碼流實施Spliterator
拍一下,所以這裏是我的嘗試。
需要注意的是,如果你是在平行模式使用此,要注意使用不同的ForkJoinPool
爲流和本CompletableFuture
的背後運行的任務。該流將等待未來完成,因此,如果共享相同的執行程序,甚至發生死鎖,您實際上可能會失去性能。
因此,這裏是實現:
public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}
public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
boolean parallel) {
return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}
public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
private List<CompletableFuture<? extends T>> futures;
CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
futures = stream.collect(Collectors.toList());
}
CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
this.futures = new ArrayList<>(Arrays.asList(futures));
}
CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
this.futures = new ArrayList<>(futures);
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
if (futures.isEmpty())
return false;
CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
// now at least one of the futures has finished, get its value and remove it
ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
while (it.hasPrevious()) {
final CompletableFuture<? extends T> future = it.previous();
if (future.isDone()) {
it.remove();
action.accept(future.join());
return true;
}
}
throw new IllegalStateException("Should not reach here");
}
@Override
public Spliterator<T> trySplit() {
if (futures.size() > 1) {
int middle = futures.size() >>> 1;
// relies on the constructor copying the list, as it gets modified in place
Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
futures = futures.subList(middle, futures.size());
return result;
}
return null;
}
@Override
public long estimateSize() {
return futures.size();
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED | SUBSIZED;
}
}
它通過變換給出Stream<CompletableFuture<T>>
到這些期貨的List
- 假設建立流快,辛勤工作由期貨本身做,所以列出它不應該是昂貴的。這也確保所有任務已經被觸發,因爲它強制處理源流。
爲了生成輸出流,它只是等待任何未來完成之前流傳輸其值。
一個簡單的非平行使用例(執行程序用於CompletableFuture
S,以便同時啓動它們全部):
ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((i % 10) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
return i;
}, executor)), false)
.forEach(x -> {
System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
});
executor.shutdown();
輸出:
Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13
…
作爲你可以看到,即使期貨沒有按期完成,該流幾乎立即產生價值。
的問題把它應用到的例子中,這將給(假設parseLinks()
回報CompletableFuture<String>
代替~<Void>
):
flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
.mapToObj(this::getPage)
// the next map() will give a Stream<CompletableFuture<Stream<String>>>
// hence the need for flattenStreamOfFuturesOfStream()
.map(pcf -> pcf
.thenApply(page -> flattenStreamOfFutures(page
.getDocsId()
.stream()
.map(this::getDocument)
.map(docCF -> docCF.thenCompose(this::parseLinks)),
false))),
false)
.forEach(System.out::println);
基本上你需要flatMap CompletableFuture成流 –