2016-01-21 67 views
1

我有工作接口IJob這樣的:在Java中運行多個線程,錯誤管理

public interface IJob { 
    public void execute(); 
} 

在我的應用程序有多個類實現了這個接口,像IJob1和IJob2:

public class IJob1 implements IJob{ 
@Override 
public void execute(){ 
    System.out.println("IJob1\n"); 
    } 
} 

public class IJob2 implements IJob{ 
@Override 
public void execute(){ 
    System.out.println("IJob2\n"); 
    } 
} 

因爲要運行的作業量穩步增加,所以我想創建一個新的作業,它將IJob實例列表並行運行。新實現用於並行運行作業的線程數量應該是可配置的。如果其中一個作業引發異常,則其他所有當前正在運行的作業也應該停止,並且execute()方法應將異常傳遞給調用者。

我寫了這個,但我不能夠運行作業和檢查,如果有一個錯誤:

import java.util.LinkedList; 

public class WorkQueue 
{ 
    private final int nThreads; 
    private final IJob[] threads; 
    private final LinkedList queue; 

    public WorkQueue(int nThreads) 
    { 
     this.nThreads = nThreads; 
     queue = new LinkedList(); 
     threads = new IJob[nThreads]; 

     for (int i=0; i<nThreads; i++) { 
      threads[i] = new IJob(); 
      threads[i].execute(); 
     } 
    } 

    public void execute(Runnable r) { 
    synchronized(queue) { 
     queue.addLast(r); 
     queue.notify(); 
    } 
} 

private class PoolWorker extends Thread { 
    public void run() { 
     Runnable r; 

     while (true) { 
      synchronized(queue) { 
       while (queue.isEmpty()) { 
        try 
        { 
         queue.wait(); 
        } 
        catch (InterruptedException ignored) 
        { 
        } 
       } 

       r = (Runnable) queue.removeFirst(); 
      } 

      // If we don't catch RuntimeException, 
      // the pool could leak threads 
      try { 
       r.run(); 
      } 
      catch (RuntimeException e) { 
       // You might want to log something here 
      } 
     } 
    } 
} 
} 

能否請您給我一些幫助很少去? 非常感謝。

+2

看看'ThreadPoolExecutor'。你的工作將是這方面的任務。 – Thomas

+0

它們不是線程,所以它們按順序執行......創建一個需要擴展'Thread'類的線程 – Marcx

+0

感謝您的幫助。我會去看看這個背景 – sharkbait

回答

5

我會推薦一個託管線程池。典型模式是使用Java Executors來獲得ExecutorService。 ExecutorService通常使用線程池和作業隊列來實現。在ExecutorService上,有一些方法如shutdownNow()「嘗試停止所有正在執行的任務」。這聽起來像你想做的事情。

例,

List<Callable<Result>> tasks = new ArrayList<>(); 
for (final Object job: jobs) { 
    final Callable<Result> task = new Callable<Result>() { 
      @Override 
      public Result call() throws Exception { 
       // Do job here 
       return result; 
      } 
     }; 
    tasks.add(task); 
} 
final numOfThreads = 20; 
final ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); 
try { 
    executor.invokeAll(tasks); 
} finally { 
    executor.shutdownNow(); 
} 
+0

太棒了!你能否在我的答案中給我一個例子? – sharkbait

+0

@sharkbait如果你不能使用Java 8,我會使用ExecutorService。 –

+0

我使用Java 7,所以我可以使用它? – sharkbait

4

I want to create a new job, which takes a list of IJob instances and runs them in parallel. The amount of threads that the new implementation is using to run the jobs in parallel should be configurable. If one of the jobs throws an exception, all other currently running jobs should be stopped as well and the execute() method should pass the exception to the caller.

這是使用parallelStream()會讓你的生活更加簡單。你可以做

list.parallelStream(). 
    .forEach(IJob::execute); 

你可能會發現你根本不需要框架,你可以將這段代碼合併到調用者中。例如

Map<String, T> map = dataSet.parallelStream() 
          .map(t -> t.transform1()) 
          .filter(t -> t.isGood()) 
          .collect(Collectors.groupingByConcurrent(t.getKey()));