3

我正在調用使用invokeAll()的線程列表。 AFAIK invokeAll()僅在所有線程完成其任務時纔會返回。如果線程列表中的任何線程發生異常,則中斷所有線程

ExecutorService threadExecutor = Executors.newFixedThreadPool(getThreadSize()); 
List<Future<Object>> future = w_threadExecutor.invokeAll(threadList); 

當所有線程完成

for (Future<Object> w_inProgressThread : w_future) 
{ 
// 

它停止在其出現異常,而不是剩下的一個線程這就是所謂的。 如果任何線程拋出異常,是否有辦法停止所有其他線程? 或者我必須提交每個任務而不是invokeAll()?我試過在invokeAll()上使用invokeAny()而不是cancell剩餘的任務 invokeAny():如果其中一個任務完成(或引發異常),則其餘的Callable被取消。 編號:http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

更新:

CompletionService<Object> completionService = new ExecutorCompletionService<Object>(w_threadExecutor); 
       List<Future<Object>> futures = new ArrayList<Future<Object>>(); 
       for(Thread w_mt : threadList) 
       { 
       futures.add(completionService.submit(w_mt)); 
       } 
       for (int numTaken = 0; numTaken < futures.size(); numTaken++) { 
        Future f = completionService.take(); 
        try { 
         Object result = f.get(); 
         System.out.println(result); // do something with the normal result 
        } catch (Exception e) { 
         System.out.println("Catched ExecutionException, shutdown now!"); 
         //threadExecutor.shutdownNow(); 
         Thread.currentThread().interrupt(); 

         for (Future<Object> inProgressThread : futures) 
         { 
          inProgressThread.cancel(true); 
         } 
         break; 
        } 

更新1:

至於建議由waltersu我試圖

ExecutorService threadExecutor = Executors.newFixedThreadPool(3); 
       CompletionService<Object> completionService = new ExecutorCompletionService<Object>(threadExecutor); 
       List<Future<Object>> futures = new ArrayList<Future<Object>>(); 
       futures.add(completionService.submit(new Callable<Object>() { 
       @Override 
       public Object call() throws Exception { 
        String s=null; 
       // Thread.sleep(1000); 
        for(int i=0; i < 1000000; i++){ 
         int j =10 ; 
         if(i==100) 
         { 

         s.toString(); 
         } 

         System.out.println("dazfczdsa :: " + i); 
        } 
        //throw new Exception("This is an expected Exception"); 
       return s; 
       } 
       })); 
       futures.add(completionService.submit(new Callable<Object>() { 
       @Override 
       public Object call() throws Exception { 
        for(int i=0; i < 1000000; i++){ 
         int j =0 ; 
         j= j+2; 
         System.out.println("dasa :: " + i); 
        } 
        Thread.sleep(3000); 

        return "My First Result"; 
       } 
       })); 

       while (futures.size() > 0) { 
       Future f = completionService.take(); 
       futures.remove(f); 
       try { 
        Object result = f.get(); 
        System.out.println(result); // do something with the normal result 
       } catch (ExecutionException e) { 
        System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!"); 
        f.cancel(true); 
        threadExecutor.shutdownNow(); 
        break; 
       } 
       } 
       System.out.println("Main exists"); 

這個時候發生異常

+0

threadExecutor.notifyAll()中斷所有線程 –

+0

@AkashLodha ''從'Object'中的notifyAll()'喚醒在這個對象的監視器上等待的所有線程「。線程不在任何特定對象的顯示器上等待,是嗎?我錯過了什麼? – davmac

+0

我假設threadList是想要在同一個對象上鎖定的線程。 –

回答

0
不會停止
+0

如果一個任務以異常退出,invokeAll仍然等待所有任務完成。 – waltersu

+0

你..我已經改變了答案 –

0

您可以調用包含您的工作可執行文件的引導程序,並引用它們將要調用的執行程序。例外情況下,您可以在run()中立即關閉。

class ExceptionHandlingWrapper implements Runnable{ 
    private ExecutorService es; 
    private Runnable childRunnable; 

    // CTOR taking an ExecutorService and a Runnable 
    public ExceptionHandlingWrapper (ExecutorService es, Runnable work){ 
     this.es = es; 
     this.childRunnable = work; 
    } 

    @Override public void run(){ 
     try{ 
      childRunnable.run(); 
     } 
     catch(Exception ex){ 
      // Todo: LOG IT! 
      es.shutdownNow(); 
     } 
    } 
} 

這也適用於Callables當然。

3

你必須提交()一個接一個,而不是的invokeAll(),然後檢查未來有例外。

public static void main(String[] args) throws InterruptedException { 
    ExecutorService threadExecutor = Executors.newFixedThreadPool(3); 
    CompletionService<Object> completionService = new ExecutorCompletionService<>(threadExecutor); 
    List<Future<Object>> futures = new ArrayList<>(); 
    futures.add(completionService.submit(new Callable<Object>() { 
    @Override 
    public Object call() throws Exception { 
     Thread.sleep(1000); 
     throw new Exception("This is an expected Exception"); 
    } 
    })); 
    futures.add(completionService.submit(new Callable<Object>() { 
    @Override 
    public Object call() throws Exception { 
     Thread.sleep(3000); 
     return "My First Result"; 
    } 
    })); 

    while (futures.size() > 0) { 
    Future f = completionService.take(); 
    futures.remove(f); 
    try { 
     Object result = f.get(); 
     System.out.println(result); // do something with the normal result 
    } catch (ExecutionException e) { 
     System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!"); 
     threadExecutor.shutdownNow(); 
     break; 
    } 
    } 
    System.out.println("Main exists"); 
} 

更新1:(回答OP的更新1題)

那是因爲你的任務有很長的環路,它不檢查中斷,這使得你的任務不可取消。那麼你如何阻止它?我認爲你必須修改你的其他任務以使它們可以被取消。由於the official doc說:

如果一個線程很長時間沒有調用引發InterruptedException的方法會怎麼樣?然後它必須定期調用Thread.interrupted,如果接收到中斷,則返回true。例如:

for (int i = 0; i < inputs.length; i++) { 
    heavyCrunch(inputs[i]); 
    if (Thread.interrupted()) { 
     // We've been interrupted: no more crunching. 
     return; 
    } 
} 

,如果你不希望修改的任務,也希望它很快停下來?有一種方法可以阻止不可取消的線程。它是Thread.stop()。但是,首先,如果不使用反射,您無法從線程池中獲取線程。此外,根據javadoc,因「根本上不安全」而不推薦使用。因此,最佳實踐(我認爲)是檢查你的任務(或代碼的一部分)的中斷,這是無法解決的,需要很長時間才能完成。

+0

請檢查我更新的問題。 – happy

+0

更新:如果所有任務都成功完成,仍然需要調用threadExecutor.shutdown()以便應用程序可以存在。因爲http://stackoverflow.com/a/20057584/4493265 – waltersu

+0

請檢查我的更新1:在問題..如果發生異常ut不會終止其他線程 – happy