2012-07-09 70 views
2

我有一個任務處理需要拋出IOException的文件目錄,如果出現任何問題。我也需要它更快,所以我將工作分解成多個線程並等待終止。它看起來是這樣的:創建自我中斷執行程序服務

//Needs to throw IOException so the rest of the framework handles it properly. 
public void process(File directory) throws IOException { 
    ExecutorService executorService = 
     new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS, 
      new LinkedBlockingQueue<Runnable>()); 

    //Convenience class to walk over relevant file types. 
    Source source = new SourceImpl(directory); 
    while (source.hasNext()) { 
     File file = source.next(); 
     executorService.execute(new Worker(file)); 
    } 

    try { 
     executorService.shutdown(); 
     executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    } catch (InterruptedException e) { 
     executorService.shutdownNow(); 
     throw new IOException("Worker thread had a problem!"); 
    } 
} 

雖然工作者線程基本上是:

private class Worker implements Runnable { 
    private final File file; 
    public Worker(File file) { this.file = file; } 

    @Override 
    public void run() { 
     try { 
      //Do work 
     } catch (IOException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 

所需的行爲方式是,如果任何工人有IOException,則產卵線由意識到這一點,並能在轉身拋出自己的IOException。這是我想到的允許Worker線程發出錯誤信號的最佳方式,但我仍然不確定是否將其設置正確。

所以,首先,這會做我所期望的嗎?如果一個Worker線程在run()中有一個錯誤,將會調用Thread.currentThread().interrupt();導致InterruptedException被拋出,使得它被阻塞executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);捕獲?其次,如果一個正在運行的Worker在所有線程排隊之前調用它的中斷會發生什麼;在阻止try/catch塊之前?

最後(也是最重要的),有沒有更優雅的方式來實現我的目標?我想讓所有無數的子線程都執行直到完成或直到其中任何一個有錯誤,此時我想在生成的線程中處理它(通過有效地使整個目錄失效)。


基於答案SOLUTION

,下面是我最終使用的實施。它很好地處理了我的異步期望,並在IOExceptions上乾淨而快速地失敗。

public void process(File directory) throws IOException { 
    //Set up a thread pool of 16 to do work. 
    ExecutorService executorService = Executors.newFixedThreadPool(16); 
    //Arbitrary file source. 
    Source source = new SourceImpl(directory); 
    //List to hold references to all worker threads. 
    ArrayList<Callable<IOException>> filesToWork = 
     new ArrayList<Callable<IOException>>(); 
    //Service to manage the running of the threads. 
    ExecutorCompletionService<IOException> ecs = 
     new ExecutorCompletionService<IOException>(executorService); 

    //Queue up all of the file worker threads. 
    while (source.hasNext()) 
     filesToWork.add(new Worker(file)); 

    //Store the potential results of each worker thread. 
    int n = filesToWork.size(); 
    ArrayList<Future<IOException>> futures = 
     new ArrayList<Future<IOException>>(n); 

    //Prepare to return an arbitrary worker's exception. 
    IOException exception = null; 
    try { 
     //Add all workers to the ECS and Future collection. 
     for (Callable<IOException> callable : filesToWork) 
      futures.add(ecs.submit(callable)); 
     for (int i = 0; i < n; i++) { 
      try { 
       //Get each result as it's available, sometimes blocking. 
       IOException e = ecs.take().get(); 
       //Stop if an exception is returned. 
       if (e != null) { 
        exception = e; 
        break; 
       } 
      //Also catch our own exceptions. 
      } catch (InterruptedException e) { 
       exception = new IOException(e); 
       break; 
      } catch (ExecutionException e) { 
       exception = new IOException(e); 
       break; 
      } 
     } 
    } finally { 
     //Stop any pending tasks if we broke early. 
     for (Future<IOException> f : futures) 
      f.cancel(true); 
     //And kill all of the threads. 
     executorService.shutdownNow(); 
    } 

    //If anything went wrong, it was preserved. Throw it now. 
    if (exception != null) 
     throw exception; 
} 

而且

//Does work, and returns (not throws) an IOException object on error. 
private class Worker implements Callable<IOException> { 
    private final File file; 
    public Worker(File file) { this.file = file; } 

    @Override 
    public IOException call() { 
     try { 
      //Do work 
     } catch (IOException e) { 
      return e; 
     } 
     return null; 
    } 
} 
+1

Thread.currentThread()中斷(); 只中斷當前正在運行的線程,在這種情況下是工作人員本身。所有其他正在運行的線程都不受此中斷的影響。 – 2012-07-09 19:40:34

+0

而且您不能依賴InterruptedException,因爲它可能會或可能不會被引發。即使它被拋出,也會丟失原始IOException的上下文。 – 2012-07-09 19:47:38

+0

我有一種感覺InterruptedException是不可靠的,雖然我不會關心最初的IOException的上下文以及它發生的事實。但是,傑塔爾博恩已經指出了我沒有意識到我正在尋找的方向。 – 2012-07-09 20:46:16

回答

2

調用interrupt()那樣會不是影響主線程。

你應該做的是讓你的工作人員Callable而不是Runnable,並允許失敗例外離開call()方法。然後,使用ExecutorCompletionService執行你所有的工人。這將允許您確定每個任務的狀態,並在其中一個任務失敗時在主線程中執行操作。

+0

謝謝。我以前不知道CompletionServices,但他們似乎正是我想要做的;當然比我想要做的更優雅。 – 2012-07-09 20:44:56

1

與往常一樣,線程之間最好的溝通是隊列。讓每個工作人員發送一條描述其執行完成情況的消息,並讓產生的線程從隊列中讀取。此外,由於產卵線程知道它產生了多少工人,因此只需計算消息即可知道所有工人何時完成,而不依賴於池關閉。

1

製作工人實施Callable<Void>,你可以這樣做:

public void process(File directory) throws IOException { 
    ExecutorService executorService = new ThreadPoolExecutor(16, 16, 
      Long.MAX_VALUE, TimeUnit.NANOSECONDS, 
      new LinkedBlockingQueue<Runnable>()); 

    // Convenience class to walk over relevant file types. 
    List<Future<Void>> futures = new ArrayList<Future<Void>>(); 
    Source source = new SourceImpl(directory); 
    while (source.hasNext()) { 
     File file = source.next(); 
     futures.add(executorService.submit(new Worker(file))); 
    } 

    try { 
     for (Future<Void> future : futures) { 
      future.get(); 
     } 
    } catch (ExecutionException e) { 
     throw new IOException("Worker thread had a problem!", e.getCause()); 
    } catch (InterruptedException e) { 
     throw new IOException("Worker thread had a problem!", e); 
    } finally { 
     executorService.shutdown(); 
    } 
}