2014-04-27 53 views
10

我正在玩Java 8完整的期貨。我有以下代碼:如何取消Java 8可以完成的未來?

CountDownLatch waitLatch = new CountDownLatch(1); 

CompletableFuture<?> future = CompletableFuture.runAsync(() -> { 
    try { 
     System.out.println("Wait"); 
     waitLatch.await(); //cancel should interrupt 
     System.out.println("Done"); 
    } catch (InterruptedException e) { 
     System.out.println("Interrupted"); 
     throw new RuntimeException(e); 
    } 
}); 

sleep(10); //give it some time to start (ugly, but works) 
future.cancel(true); 
System.out.println("Cancel called"); 

assertTrue(future.isCancelled()); 

assertTrue(future.isDone()); 
sleep(100); //give it some time to finish 

使用runAsync我安排等待鎖存器的代碼的執行。接下來,我取消了未來,期待一個被打斷的異常被拋入其中。但是看起來線程在await調用中仍然被阻塞,並且即使未來被取消(斷言通過),也不會拋出InterruptedException。使用ExecutorService的等效代碼按預期工作。它是CompletableFuture中的錯誤還是在我的例子中?

+0

你可以用'Executors.newFixedThreadPool'和'Executors.newWorkStealingPool'重現問題嗎?如果比較兩種不同的執行者實施方式,比較期貨和可完成期貨的比較將會使問題更清晰。 – nosid

+0

JavaDoc說cancel(true)取消了CancellationException,但你沒有捕獲它。 – edharned

+0

@nosid你是對的,newWorkStealingPool顯然不支持取消 – Lukas

回答

10

顯然,這是故意的。對於該方法CompletableFuture::cancel狀態的Javadoc:

[參數:] mayInterruptIfRunning - 這個值在該實現方式沒有效果,因爲中斷將不被用於控制處理。

有趣的是,該方法使用ForkJoinTask::cancel幾乎相同的措辭mayInterruptIfRunning參數

我對這個問題的一個猜測:

  • 中斷旨在與阻塞操作中使用,如睡眠等待或I/O操作,
  • 但也不CompletableFutureForkJoinTask旨在與阻止操作一起使用。

而是阻塞,一個CompletableFuture應該創建一個新CompletionStage和CPU密集型任務是對的fork-join模型的前提條件。所以,使用中斷與他們中的任何一個都會打敗他們的目的。另一方面,這可能會增加複雜性,如果按預期使用則不需要。

+0

爲什麼你認爲_CompletableFuture_和_ForkJoinTask_不打算用於阻塞操作? –

+0

CompletableFuture和其他反應性的東西的重點是:不要讓線程等待一些長操作的結果而浪費線程。相反,提供一個回調,當結果在這裏時被調用。反應式方法需要少一些線程。 –

0

CancellationException是內部ForkJoin取消例程的一部分。當您檢索未來的結果時會出現異常:

try { future.get(); } 
     catch (Exception e){ 
      System.out.println(e.toString());    
     } 

花了一段時間在調試器中看到了這一點。 JavaDoc並不清楚正在發生什麼或您應該期望什麼。

1

當您撥打CompletableFuture#cancel時,您只能停止鏈條的下游部分。上游部分,我。即最終會調用complete(...)completeExceptionally(...)的東西,不會得到任何結果不再需要的信號。

那些'上游'和'下游'的東西是什麼?

讓我們看看下面的代碼:

CompletableFuture 
     .supplyAsync(() -> "hello")    //1 
     .thenApply(s -> s + " world!")   //2 
     .thenAccept(s -> System.out.println(s)); //3 

這裏,數據流從上到下 - 從供應商創建,通過功能進行修改,由println被消耗。特定步驟上面的部分稱爲上游,下面的部分稱爲下游。例如,步驟1和步驟2在步驟3的上游。

以下是幕後發生的情況。這不是精確的,而是一個方便的思維模型。

  1. 正在執行供應商(步驟1)(在JVM的共同ForkJoinPool內部)。
  2. 供應商的結果隨後被complete(...)傳遞到下游的下一個CompletableFuture
  3. 在接收到該結果,即CompletableFuture調用下一步驟 - 一個功能(步驟2),其發生在先前的步驟的結果,並返回的東西,將進一步通過,向下遊CompletableFuturecomplete(...)
  4. 在接收到步驟2的結果,步驟3 CompletableFuture調用消費者,System.out.println(s)。消費完成後,下游CompletableFuture將獲得它的價值,(Void) null

我們可以看到,在這條產業鏈的每個CompletableFuture有權知道誰是有下游等待價值被傳遞到他們的complete(...)(或completeExceptionally(...) )。但CompletableFuture不必知道它的上游(或上游 - 可能有幾個)。

因此,要求cancel()在第3步不中止步驟1和2,因爲沒有從第3步鏈接步驟2

據推測,如果你使用CompletableFuture那麼你步伐不大這足以讓如果執行幾個額外的步驟就沒有什麼壞處。

如果你想取消被傳播的上游,你有兩個選擇:

  • 這個工具你自己 - 創造出每一步之後檢查專用CompletableFuture(名字像cancelled)(類似step.applyToEither(cancelled, Function.identity())
  • 像RxJava 2使用反應堆,ProjectReactor /光通量或阿卡流
相關問題