2012-12-07 70 views
1

在線程拋出異常的情況下,我該如何等待,直到沒有拋出異常的所有線程都已完成(因此用戶在所有事情都停止之前不會再次啓動)?我使用GPars以幾種不同的方式,所以我需要爲每個(並行集合,異步閉包和fork/join)使用策略。例外情況沒有被埋沒,他們很好地通過承諾,getChildrenResults等處理,所以這不是問題(感謝Vaclav Pech的答案)。我只需要確保主線程等待,直到任何仍在運行的東西完成或以其他方式停止。例如,當使用並行集合時,一些線程繼續運行,而另一些線​​程在異常之後永遠不會啓動。因此,很難說出有多少人在等待,或者有可能抓住他們。GPars如何知道拋出異常時所有線程都已完成?

我的猜測是也許有一種方法來處理線程池(在這種情況下GParsPool)。有什麼建議麼?

謝謝!

回答

3

我相信我有這個問題的解決方案,我實現了它在全面測試後的應用程序和它的作品。

withPool closure作爲第一個參數傳入創建的池(a jsr166y.ForkJoinPool)。我可以抓住這一點,保存它關閉在一個變量(currentPool),要由主線程以後使用,就像這樣:

GParsPool.withPool { pool -> 
     currentPool = pool 

當一個異常被拋出,並返回到主線程對於處理,我可以把它等到一切都完了,像這樣的東西:

} catch (Exception exc) { 
     if (currentPool) { 
      while (!currentPool.isQuiescent()) { 
       Thread.sleep(100) 
       println 'waiting for threads to finish' 
      } 
     } 

     println 'all done' 
    } 

的isQuiescent()似乎是要確保有沒有做更多的工作,以安全的方式。

請注意,在測試過程中,我還發現異常似乎並不像我原先想象的那樣終止循環的執行。如果我有一個500的列表,並且做了eachParallel,他們都會跑,不管第一個是否有錯誤。所以我必須通過在並行循環的異常處理程序中使用currentPool.shutdownNow()來終止循環。另請參見:GPars - proper way to terminate a parallel collection early

下面是實際的解決方案的完整簡化表示:

void example() { 
    jsr166y.ForkJoinPool currentPool 

    AtomicInteger threadCounter = new AtomicInteger(0) 
    AtomicInteger threadCounterEnd = new AtomicInteger(0) 

    AtomicReference<Exception> realException = new AtomicReference<Exception>() 

    try { 
     GParsPool.withPool { pool -> 
      currentPool = pool 

      (1..500).eachParallel { 
       try { 
        if (threadCounter.incrementAndGet() == 1) { 
         throw new RuntimeException('planet blew up!') 
        } 

        if (realException.get() != null) { 
         // We had an exception already in this eachParallel - quit early 
         return 
        } 

        // Do some long work 
        Integer counter=0 
        (1..1000000).each() {counter++} 

        // Flag if we went all the way through 
        threadCounterEnd.incrementAndGet() 
       } catch (Exception exc) { 
        realException.compareAndSet(null, exc) 

        pool.shutdownNow() 
        throw realException 
       } 
      } 
     } 
    } catch (Exception exc) { 
     // If we used pool.shutdownNow(), we need to look at the real exception. 
     // This is needed because pool.shutdownNow() sometimes generates a CancellationException 
     // which can cover up the real exception that caused us to do a shutdownNow(). 
     if (realException.get()) { 
      exc = realException.get() 
     } 

     if (currentPool) { 
      while (!currentPool.isQuiescent()) { 
       Thread.sleep(100) 
       println 'waiting for threads to finish' 
      } 
     } 

     // Do further exception handling here... 
     exc.printStackTrace() 
    } 
} 

回到我前面的例子,如果我拋出異常一日一次通過一個4核的機器上,有大約5個線程排隊。 shutdownNow()會在大約20個左右的線程通過後停止工作,所以在頂端附近進行「退出提前」檢查可以幫助那些20個或更多的線程儘快退出。

只是在這裏發佈它以防萬一它幫助別人,作爲對我在這裏得到的所有幫助的回報。謝謝!

2

我相信你需要捕捉異常,然後返回除預期結果之外的其他內容(例如String或null,如果你期待一個數字例如),即;

@Grab('org.codehaus.gpars:gpars:0.12') 
import static groovyx.gpars.GParsPool.* 

def results = withPool { 
    [1,2,3].collectParallel { 
    try { 
     if(it % 2 == 0) { 
     throw new RuntimeException('2 fails') 
     } 
     else { 
     Thread.sleep(2000) 
     it 
     } 
    } 
    catch(e) { e.class.name } 
    } 
} 
+0

在我的示例中,結果將包含傳遞線程的整數,以及引發異常的字符串。所有線程將完成... –

+0

好的,我看到斷開連接的位置。假設您的集合中有1,000個項目要迭代。如果拋出一個異常,通常會發生的事情是組被中斷 - 它會停止觸發線程,這是我想要的,並且控制權返回到主線程。所以不是每個線程都會被解僱。但是,在主線程中,仍然有線程在異常之前啓動,但尚未完成清理。那有意義嗎? – user1373467

+0

現在,您的建議確實給了我一個想法 - 我可以保留多少實際被解僱並跟蹤完成的數量。我只是希望更簡單一些。但我很欣賞它,它是有用的+1 – user1373467

相關問題