13

Java 8並行流如何在消費子句中拋出的異常上行爲,例如在forEach處理中?例如,以下代碼:Java 8並行流如何在拋出的異常上運行?

final AtomicBoolean throwException = new AtomicBoolean(true); 
IntStream.range(0, 1000) 
    .parallel() 
    .forEach(i -> { 
     // Throw only on one of the threads. 
     if (throwException.compareAndSet(true, false)) { 
      throw new RuntimeException("One of the tasks threw an exception. Index: " + i); 
     }); 

是否立即停止處理的元素?它是否等待已經啓動的元素完成?它是否等待所有的流完成?拋出異常後它是否開始處理流元素?

它什麼時候返回?異常之後立即?畢竟/部分元素是由消費者處理的?

在並行流拋出異常後繼續處理元素嗎? (發現這種情況發生)。

這裏有一個通用規則嗎?

編輯(15-11-2016)

以確定是否可以並行流早回來,我發現它不確定性:

@Test 
public void testParallelStreamWithException() { 
    AtomicInteger overallCount = new AtomicInteger(0); 
    AtomicInteger afterExceptionCount = new AtomicInteger(0); 
    AtomicBoolean throwException = new AtomicBoolean(true); 

    try { 
     IntStream.range(0, 1000) 
      .parallel() 
      .forEach(i -> { 
       overallCount.incrementAndGet(); 
       afterExceptionCount.incrementAndGet(); 
       try { 
        System.out.println(i + " Sleeping..."); 
        Thread.sleep(1000); 
        System.out.println(i + " After Sleeping."); 
       } 
       catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       // Throw only on one of the threads and not on main thread. 
       if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) { 
        System.out.println("Throwing exception - " + i); 
        throw new RuntimeException("One of the tasks threw an exception. Index: " + i); 
       } 
      }); 
     Assert.fail("Should not get here."); 
    } 
    catch (Exception e) { 
     System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0."); 
     afterExceptionCount.set(0); 
    } 
    System.out.println("Overall count: " + overallCount.get()); 
    System.out.println("After exception count: " + afterExceptionCount.get()); 
} 

逾期歸還從拋時不主線程。這導致很多新的元素在拋出異常後被處理。在我的機器上,拋出異常後處理了大約200個元素。但是,並非所有的1000個元素都被處理了。那麼這裏的規則是什麼?爲什麼即使拋出異常也處理了更多元素?

提前返回時刪除不是(!)符號,導致異常在主線程中拋出。只有已經開始的元素完成了處理,並沒有處理新的元素。早在這裏就是這種情況。與以前的行爲不一致。

我在這裏錯過了什麼?

回答

4

當一個異常在一個階段拋出,它不會等待其他操作完成,異常被重新拋出給調用者。 這就是ForkJoinPool如何處理它。

相反的FindFirst例如在並行運行時,將目前的結果,以僅在所有的操作已經完成處理(即使結果是需要的所有操作的結束之前已知的)調用者。

放在換句話說:它會提前返回,但會留下所有正在運行的任務完成。

編輯回答最後一個評論

這是由霍爾格的回答很說明(在評論鏈接),但這裏有一些細節。

1)當全部遇難,但主線程,你也殺了所有應該由這些線程處理的任務。所以應該實際上是250左右,因爲有1000個任務和4個線程,我假設這返回3?:

int result = ForkJoinPool.getCommonPoolParallelism(); 

理論上有1000個任務,有4個線程,每個線程應該處理250級的任務,那麼你殺了他們的意思750個任務的3丟失。 還有250個任務需要執行,而ForkJoinPool將跨越3個新線程來執行這250個左側任務。

有幾件事情,你可以嘗試,改變你的流像這樣(做不大小的流):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach 

這一次,會有很多結局更多的操作,因爲最初的分離指標是未知的,通過其他策略選擇。你也可以嘗試爲改變這樣的:

if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) { 

這樣:

if (!Thread.currentThread().getName().equals("main")) { 

這個時候你總是會殺除主所有線程,直到某一點,在沒有新的線程將被創建ForkJoinPool作爲任務太小而無法拆分,因此不需要其他線程。在這種情況下,甚至更少的任務將完成。

2)你的第二個例子,當你真正殺死主線程,如代碼的方式,你不會看到其他線程的實際運行。改變它:

} catch (Exception e) { 
     System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0."); 
     afterExceptionCount.set(0); 
    } 

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60); 

    System.out.println("Overall count: " + overallCount.get()); 
    System.out.println("After exception count: " + afterExceptionCount.get()); 
+0

你可以根據一些文件? –

+3

@ AlikElzin-kilaka不是真的,我不認爲這是記錄。我記得通過閱讀其他SO問題,引用了這個bug:https://bugs.openjdk.java.net/browse/JDK-8164690 – Eugene

+2

@ AlikElzin-kilaka還有[this](http:// mail.openjdk.java.net/pipermail/core-libs-dev/2016-August/042972.html)在core-libs-dev郵件列表上討論的話題,這個郵件列表導致了Eugene提到的JBS bug。 –

相關問題