2016-09-30 94 views
4

例如,我有這樣的方法:如何將1 completablefuture劃分爲流中的許多Completablefuture?

public CompletableFuture<Page> getPage(int i) { 
    ... 
} 
public CompletableFuture<Document> getDocument(int i) { 
    ... 
} 
public CompletableFuture<Void> parseLinks(Document doc) { 
    ... 
} 

而且我的流程:

List<CompletableFuture> list = IntStream 
    .range(0, 10) 
    .mapToObj(i -> getPage(i)) 

    // I want method like this: 
    .thenApplyAndSplit(CompletableFuture<Page> page -> { 
     List<CompletableFuture<Document>> docs = page.getDocsId() 
      .stream() 
      .map(i -> getDocument(i)) 
      .collect(Collectors.toList()); 
     return docs; 
    }) 
    .map(CompletableFuture<Document> future -> { 
     return future.thenApply(Document doc -> parseLink(doc); 
    }) 
    .collect(Collectors.toList()); 

它應該像flatMap()CompletableFuture,所以我想實現這個流程:

List<Integer> -> Stream<CompletableFuture<Page>> 
       -> Stream<CompletableFuture<Document>> 
       -> parse each 

UPDATE

Stream<CompletableFuture<Page>> pagesCFS = IntStream 
     .range(0, 10) 
     .mapToObj(i -> getPage(i)); 

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> { 
    // How to return stream of Document when page finishes? 
    // page.thenApply(...) 
}) 
+0

基本上你需要flatMap CompletableFuture成流

回答

0

你真的不得不使用Streams嗎?難道你不能只對你的CompletableFutures進行一些相關的操作嗎?特別是自上一次調用返回CompletableFutures<Void>(當然,也有可能使用Collection.forEach

List<CompletableFuture<Page>> completableFutures = IntStream 
     .range(0, 10) 
     .mapToObj(i -> getPage(i)).collect(Collectors.toList()); 

for (CompletableFuture<Page> page : completableFutures) { 
    page.thenAccept(p -> { 
     List<Integer> docsId = p.getDocsId(); 
     for (Integer integer : docsId) { 
      getDocument(integer).thenAccept(d-> parseLinks(d)); 
     } 
    }); 
} 

編輯:嗯,所以我作了一次嘗試,但我不知道這是否是一個好主意,因爲我我不是CompletableFuture的專家。

使用下面的方法(也許有可能是一個更好的實現):

public static <T> CompletableFuture<Stream<T>> flatMapCF(Stream<CompletableFuture<T>> stream){ 
    return CompletableFuture.supplyAsync(()-> 
     stream.map(CompletableFuture::join) 
    ); 
} 


Stream<CompletableFuture<Page>> pagesCFS = IntStream 
     .range(0, 10) 
     .mapToObj(i -> getPage(i)); 

CompletableFuture<Stream<Page>> pageCF = flatMapCF(pagesCFS); 

CompletableFuture<Stream<Document>> docCF= 
    pageCF.thenCompose(a -> 
     flatMapCF(a.flatMap(
       b -> b.getDocsId() 
         .stream() 
         .map(c -> getDocument(c)) 
     ))); 

這個問題可能是,這是CompletableFuture回報,只有當所有的結果都可以

+0

的問題,我不要不知道有多少文檔頁面。未來完成後我可以知道這個 – mystdeim

+0

@mystdeim:但是應該在完成它的'CompletableFuture page'後調用'p.getDocsId()',如果這對你不好? – user140547

+0

沒關係,但我想在最後得到一個Document流。也許這對於jdk 8是不可能的,我應該使用RxJava,因爲flatMap運算符在那裏存在 – mystdeim

0

如果你不關心當操作完成,那麼下面將簡單地在所有文件上觸發parseLinks()

IntStream.range(0, 10) 
     .mapToObj(this::getPage) 
     .forEach(pcf -> pcf 
       .thenAccept(page -> page 
         .getDocsId() 
         .stream() 
         .map(this::getDocument) 
         .forEach(docCF -> docCF.thenCompose(this::parseLinks)))); 

行吟rwise,因爲你上次的操作返回CompletableFuture<Void>,我假設你主要有興趣知道什麼時候一切都完成了。你可以做這樣的事情:

CompletableFuture<Void> result = CompletableFuture.allOf(IntStream.range(0, 10) 
     .mapToObj(this::getPage) 
     .map(pcf -> pcf 
       .thenCompose(page -> CompletableFuture.allOf(page 
         .getDocsId() 
         .stream() 
         .map(docId -> getDocument(docId) 
           .thenCompose(this::parseLinks)) 
         .toArray(CompletableFuture[]::new)))) 
     .toArray(CompletableFuture[]::new)); 

如果你有興趣的個人CompletableFuture S的結果,最好的可能是直接在流中處理它們,在創建它們的地方。

你甚至可以用可重用的方法來包裝這一切。例如,如果parseLinks()被返回CompletableFuture<List<String>>,你可以定義一個方法是這樣的:

public CompletableFuture<Void> processLinks(Function<? super CompletableFuture<List<String>>, ? extends CompletableFuture<?>> processor) { 
    return CompletableFuture.allOf(IntStream.range(0, 10) 
      .mapToObj(this::getPage) 
      .map(pcf -> pcf 
        .thenCompose(page -> CompletableFuture.allOf(page 
          .getDocsId() 
          .stream() 
          .map(docId -> getDocument(docId) 
            .thenCompose(this::parseLinks)) 
          .map(processor) // here we apply the received function 
          .toArray(CompletableFuture[]::new)))) 
      .toArray(CompletableFuture[]::new)); 
} 

,並處理由此產生的名單是這樣的:

processLinks(linksCF -> linksCF 
     .thenAccept(links -> links.forEach(System.out::println))); 

返回CompletableFuture將完成一次所有鏈接已打印。

2

我也想爲CompletableFutures的碼流實施Spliterator拍一下,所以這裏是我的嘗試。

需要注意的是,如果你是在平行模式使用此,要注意使用不同的ForkJoinPool爲流和本CompletableFuture的背後運行的任務。該流將等待未來完成,因此,如果共享相同的執行程序,甚至發生死鎖,您實際上可能會失去性能。

因此,這裏是實現:

public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) { 
    return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel); 
} 

public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream, 
                  boolean parallel) { 
    return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity()); 
} 

public static class CompletableFutureSpliterator<T> implements Spliterator<T> { 
    private List<CompletableFuture<? extends T>> futures; 

    CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) { 
     futures = stream.collect(Collectors.toList()); 
    } 

    CompletableFutureSpliterator(CompletableFuture<T>[] futures) { 
     this.futures = new ArrayList<>(Arrays.asList(futures)); 
    } 

    CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) { 
     this.futures = new ArrayList<>(futures); 
    } 

    @Override 
    public boolean tryAdvance(final Consumer<? super T> action) { 
     if (futures.isEmpty()) 
      return false; 
     CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join(); 
     // now at least one of the futures has finished, get its value and remove it 
     ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size()); 
     while (it.hasPrevious()) { 
      final CompletableFuture<? extends T> future = it.previous(); 
      if (future.isDone()) { 
       it.remove(); 
       action.accept(future.join()); 
       return true; 
      } 
     } 
     throw new IllegalStateException("Should not reach here"); 
    } 

    @Override 
    public Spliterator<T> trySplit() { 
     if (futures.size() > 1) { 
      int middle = futures.size() >>> 1; 
      // relies on the constructor copying the list, as it gets modified in place 
      Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle)); 
      futures = futures.subList(middle, futures.size()); 
      return result; 
     } 
     return null; 
    } 

    @Override 
    public long estimateSize() { 
     return futures.size(); 
    } 

    @Override 
    public int characteristics() { 
     return IMMUTABLE | SIZED | SUBSIZED; 
    } 
} 

它通過變換給出Stream<CompletableFuture<T>>到這些期貨的List - 假設建立流快,辛勤工作由期貨本身做,所以列出它不應該是昂貴的。這也確保所有任務已經被觸發,因爲它強制處理源流。

爲了生成輸出流,它只是等待任何未來完成之前流傳輸其值。

一個簡單的非平行使用例(執行程序用於CompletableFuture S,以便同時啓動它們全部):

ExecutorService executor = Executors.newFixedThreadPool(20); 
long start = System.currentTimeMillis(); 
flattenStreamOfFutures(IntStream.range(0, 20) 
     .mapToObj(i -> CompletableFuture.supplyAsync(() -> { 
      try { 
       Thread.sleep((i % 10) * 1000); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new RuntimeException(e); 
      } 
      System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms"); 
      return i; 
     }, executor)), false) 
     .forEach(x -> { 
      System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x); 
     }); 
executor.shutdown(); 

輸出:

Finished 10 @ 103ms 
Finished 0 @ 105ms 
main @ 114ms handle result: 10 
main @ 114ms handle result: 0 
Finished 1 @ 1102ms 
main @ 1102ms handle result: 1 
Finished 11 @ 1104ms 
main @ 1104ms handle result: 11 
Finished 2 @ 2102ms 
main @ 2102ms handle result: 2 
Finished 12 @ 2104ms 
main @ 2105ms handle result: 12 
Finished 3 @ 3102ms 
main @ 3102ms handle result: 3 
Finished 13 @ 3104ms 
main @ 3105ms handle result: 13 
… 

作爲你可以看到,即使期貨沒有按期完成,該流幾乎立即產生價值。

的問題把它應用到的例子中,這將給(假設parseLinks()回報CompletableFuture<String>代替~<Void>):

flattenStreamOfFuturesOfStream(IntStream.range(0, 10) 
       .mapToObj(this::getPage) 
       // the next map() will give a Stream<CompletableFuture<Stream<String>>> 
       // hence the need for flattenStreamOfFuturesOfStream() 
       .map(pcf -> pcf 
         .thenApply(page -> flattenStreamOfFutures(page 
             .getDocsId() 
             .stream() 
             .map(this::getDocument) 
             .map(docCF -> docCF.thenCompose(this::parseLinks)), 
           false))), 
     false) 
.forEach(System.out::println); 
+0

哇!看起來很完美,我一定會喜歡你的代碼。當你需要使用它時,你是否遇到過真實的情況? – mystdeim

+0

不,我只是喜歡這個問題的挑戰。需要更多的測試 –

+0

謝謝我會在幾天內測試你的代碼,並說出我會得出的結論 – mystdeim

相關問題