2012-02-02 11 views
0

在使用ForkJoinPool提交任務(RecursiveActionRecursiveTask)時處理異常(未捕獲)的更好方法是什麼?更好的方式來處理ForkJoinPool中的未捕獲的異常任務/動作

ForkJoinPool接受Thread.UncaughtExceptionHandler來處理當WorkerThread突然終止(反正不在我們的控制下)時的異常,但當ForkJoinTask引發異常時不使用此處理程序。我在我的實施中使用標準submit/invokeAll方式。

這裏是我的情況:

我有一個第三方系統中的無限循環中讀取數據運行的線程。在此主題我提交任務到ForkJoinPool

new Thread() { 
     public void run() { 
     while (true) { 
      ForkJoinTask<Void> uselessReturn = 
        ForkJoinPool.submit(RecursiveActionTask); 
     } 
     } 
} 

我使用的是RecursiveAction和幾個場景一RecursiveTask。這些任務使用submit()方法提交給FJPool。 我想擁有一個類似於UncaughtExceptionHandler的泛型異常處理程序,如果任務引發未檢查/未捕獲的異常,我可以處理異常並在需要時重新提交任務。處理異常還可確保排隊的任務不會被取消,如果其中一個/某些任務引發異常。

invokeAll()方法返回一組的ForkJoinTasks的但這些任務在一個遞歸塊(每個任務調用compute()方法和可分裂進一步[假設的情況])

class RecursiveActionTask extends RecursiveAction { 

    public void compute() { 
     if <task.size() <= ACCEPTABLE_SIZE) { 
      processTask() // this might throw an checked/unchecked exception 
     } else { 
      RecursiveActionTask[] splitTasks = splitTasks(tasks) 
      RecursiveActionTasks returnedTasks = invokeAll(splitTasks); 
      // the below code never executes as invokeAll submits the tasks to the pool 
      // and the flow never comes to the code below. 
      // I am looking for some handling like this 
      for (RecusiveActionTask task : returnedTasks) { 
      if (task.isDone()) { 
       task.getException() // handle this exception 
      } 
      } 
     } 
    } 

} 

我注意到,當3-4任務失敗,整個隊列提交單元被丟棄。目前我已經把一個try/catch圍繞processTask,我個人不喜歡。我正在尋找更通用的。

  1. 我也想了解失敗使任務的全部名單,我可以重新提交他們
  2. 當任務拋出異常做線程從池中獲取驅逐(雖然我的分析發現,他們沒有按'[但不知道])?
  3. 在FutureTask上調用get()方法將更有可能使我的流程順序,因爲它等待任務完成。
  4. 我只想知道任務的狀態是否失敗。它完成(顯然不希望以後等待一個小時)

任何想法如何處理上述情況例外,當我不在乎?

回答

2

這-i顯示了我們解決了在阿卡:

/** 
* INTERNAL AKKA USAGE ONLY 
*/ 
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { 
    final override def setRawResult(u: Unit): Unit =() 
    final override def getRawResult(): Unit =() 
    final override def exec(): Boolean = try { mailbox.run; true } catch { 
    case anything ⇒ 
     val t = Thread.currentThread 
     t.getUncaughtExceptionHandler match { 
     case null ⇒ 
     case some ⇒ some.uncaughtException(t, anything) 
     } 
     throw anything 
    } 
} 
+0

我陌生​​AKKA,但如果我理解正確的話你要爲你的代碼拋出,當前線程異常處理程序的異常?如果是這樣,線程將從池中移除,因爲線程池執行程序可能會檢查此異常並將該線程標記爲壞線程。你可以解釋嗎..? – Rajendra 2012-04-18 20:38:34