2010-01-06 66 views
24

說我有這樣一個任務:在java中並行化任務的最簡單方法是什麼?

for(Object object: objects) { 
    Result result = compute(objects); 
    list.add(result); 
} 

什麼是並行計算每一個最簡單的方法()(假設他們已經並行)?

我不需要一個嚴格匹配上面的代碼的答案,只是一個普遍的答案。但是如果您需要更多信息:我的任務是IO綁定的,這是針對Spring Web應用程序的,並且這些任務將在HTTP請求中執行。

+4

應的第二行是'結果結果=計算(對象);'? – Carcigenicate 2015-10-10 18:31:35

回答

40

我建議看看ExecutorService

尤其是這樣的:

ExecutorService EXEC = Executors.newCachedThreadPool(); 
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); 
for (final Object object: objects) { 
    Callable<Result> c = new Callable<Result>() { 
     @Override 
     public Result call() throws Exception { 
      return compute(object); 
     } 
    }; 
    tasks.add(c); 
} 
List<Future<Result>> results = EXEC.invokeAll(tasks); 

注意,使用newCachedThreadPool可能是壞的,如果objects是一個大名單。緩存的線程池可以爲每個任務創建一個線程!你可能想要使用newFixedThreadPool(n),其中n是合理的(比如你擁有的核心數量,假設compute()是CPU綁定的)。

下面是實際運行的全碼:

import java.util.ArrayList; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ExecutorServiceExample { 
    private static final Random PRNG = new Random(); 

    private static class Result { 
     private final int wait; 
     public Result(int code) { 
      this.wait = code; 
     } 
    } 

    public static Result compute(Object obj) throws InterruptedException { 
     int wait = PRNG.nextInt(3000); 
     Thread.sleep(wait); 
     return new Result(wait); 
    } 

    public static void main(String[] args) throws InterruptedException, 
     ExecutionException { 
     List<Object> objects = new ArrayList<Object>(); 
     for (int i = 0; i < 100; i++) { 
      objects.add(new Object()); 
     } 

     List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); 
     for (final Object object : objects) { 
      Callable<Result> c = new Callable<Result>() { 
       @Override 
       public Result call() throws Exception { 
        return compute(object); 
       } 
      }; 
      tasks.add(c); 
     } 

     ExecutorService exec = Executors.newCachedThreadPool(); 
     // some other exectuors you could try to see the different behaviours 
     // ExecutorService exec = Executors.newFixedThreadPool(3); 
     // ExecutorService exec = Executors.newSingleThreadExecutor(); 
     try { 
      long start = System.currentTimeMillis(); 
      List<Future<Result>> results = exec.invokeAll(tasks); 
      int sum = 0; 
      for (Future<Result> fr : results) { 
       sum += fr.get().wait; 
       System.out.println(String.format("Task waited %d ms", 
        fr.get().wait)); 
      } 
      long elapsed = System.currentTimeMillis() - start; 
      System.out.println(String.format("Elapsed time: %d ms", elapsed)); 
      System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum/(elapsed * 1d))); 
     } finally { 
      exec.shutdown(); 
     } 
    } 
} 
+0

有沒有這個C#版本? – Malfist 2010-01-06 20:48:17

+1

也看看執行者,它作爲各種執行者服務的工廠的功能。 – 2010-01-06 20:49:45

+0

@Malfist在C#中有一些任務(對於即將推出的.net 4),使所有這些都變得輕而易舉:)。 有代表/ lambda表達式和線程,funcs,threadstart等在3.5 – 2010-01-16 18:44:33

0

可以簡單地創建幾個線程並獲得結果。

Thread t = new Mythread(object); 

if (t.done()) { 
    // get result 
    // add result 
} 

編輯:我認爲其他解決方案更酷。

0

我要去提一個執行者類。以下是您將放置在執行程序類中的一些示例代碼。

private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4); 

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>(); 

    public void addCallable(Callable<Object> callable) { 
     this.callableList.add(callable); 
    } 

    public void clearCallables(){ 
     this.callableList.clear(); 
    } 

    public void executeThreads(){ 
     try { 
     threadLauncher.invokeAll(this.callableList); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    public Object[] getResult() { 

     List<Future<Object>> resultList = null; 
     Object[] resultArray = null; 
     try { 

      resultList = threadLauncher.invokeAll(this.callableList); 

      resultArray = new Object[resultList.size()]; 

      for (int i = 0; i < resultList.size(); i++) { 
       resultArray[i] = resultList.get(i).get(); 
      } 

     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     return resultArray; 
    } 

然後使用它你可以調用executor類來填充和執行它。

executor.addCallable(some implementation of callable) // do this once for each task 
Object[] results = executor.getResult(); 
+0

它總是讓我煩惱,沒有一套工作的包裝類 – 2010-01-06 20:50:03

1

這裏的東西在自己的項目中,我使用:

public class ParallelTasks 
{ 
    private final Collection<Runnable> tasks = new ArrayList<Runnable>(); 

    public ParallelTasks() 
    { 
    } 

    public void add(final Runnable task) 
    { 
     tasks.add(task); 
    } 

    public void go() throws InterruptedException 
    { 
     final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime() 
       .availableProcessors()); 
     try 
     { 
      final CountDownLatch latch = new CountDownLatch(tasks.size()); 
      for (final Runnable task : tasks) 
       threads.execute(new Runnable() { 
        public void run() 
        { 
         try 
         { 
          task.run(); 
         } 
         finally 
         { 
          latch.countDown(); 
         } 
        } 
       }); 
      latch.await(); 
     } 
     finally 
     { 
      threads.shutdown(); 
     } 
    } 
} 

// ... 

public static void main(final String[] args) throws Exception 
{ 
    ParallelTasks tasks = new ParallelTasks(); 
    final Runnable waitOneSecond = new Runnable() { 
     public void run() 
     { 
      try 
      { 
       Thread.sleep(1000); 
      } 
      catch (InterruptedException e) 
      { 
      } 
     } 
    }; 
    tasks.add(waitOneSecond); 
    tasks.add(waitOneSecond); 
    tasks.add(waitOneSecond); 
    tasks.add(waitOneSecond); 
    final long start = System.currentTimeMillis(); 
    tasks.go(); 
    System.err.println(System.currentTimeMillis() - start); 
} 

這對我的雙核箱2000打印位。

相關問題