2011-10-18 144 views
1

我有以下情況:Java線程等待值

爲了運行一個算法,我必須運行多個線程和每個線程都設置一個實例變量x,它死前。問題是這些線程不會立即返回:

public Foo myAlgorithm() 
{ 
    //create n Runnables (n is big) 
    //start these runnables (may take long time do die) 

    //i need the x value of each runnable here, but they havent finished yet! 

    //get average x from all the runnables 

    return new Foo(averageX); 
} 

我應該使用等待通知嗎?或者我應該只是嵌入一個while循環並檢查終止?

謝謝大家!

回答

4

創建一些共享存儲來保存來自每個線程的值x,或者只需存儲總和即可。使用CountDownLatch等待線程終止。每個線程完成後將調用CountDownLatch.countDown(),並且您的myAlgorithm方法將使用CountDownLatch.await()方法等待它們。

編輯:下面是我建議的方法的完整示例。它創建了39個工作線程,每個線程都將一個隨機數添加到共享總和中。當所有工人都完成後,平均數就會被計算和打印。

import java.util.Random; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.atomic.AtomicInteger; 

class Worker implements Runnable { 

    private final AtomicInteger sum; 
    private final CountDownLatch latch; 

    public Worker(AtomicInteger sum, CountDownLatch latch) { 
     this.sum = sum; 
     this.latch = latch; 
    } 

    @Override 
    public void run() { 
     Random random = new Random(); 

     try { 
      // Sleep a random length of time from 5-10s 
      Thread.sleep(random.nextInt(5000) + 5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     // Compute x 
     int x = random.nextInt(500); 

     // Add to the shared sum 
     System.out.println("Adding " + x + " to sum"); 
     sum.addAndGet(x); 

     // This runnable is finished, so count down 
     latch.countDown(); 
    } 
} 

class Program { 

    public static void main(String[] args) { 
     // There will be 39 workers 
     final int N = 39; 

     // Holds the sum of all results from all workers 
     AtomicInteger sum = new AtomicInteger(); 
     // Tracks how many workers are still working 
     CountDownLatch latch = new CountDownLatch(N); 

     System.out.println("Starting " + N + " workers"); 

     for (int i = 0; i < N; i++) { 
      // Each worker uses the shared atomic sum and countdown latch. 
      Worker worker = new Worker(sum, latch); 

      // Start the worker 
      new Thread(worker).start(); 
     } 

     try { 
      // Important: waits for all workers to finish. 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     // Compute the average 
     double average = (double) sum.get()/(double) N; 

     System.out.println(" Sum: " + sum.get()); 
     System.out.println("Workers: " + N); 
     System.out.println("Average: " + average); 
    } 

} 

輸出應該是這樣的:

Starting 39 workers 
Adding 94 to sum 
Adding 86 to sum 
Adding 454 to sum 
... 
... 
... 
Adding 358 to sum 
Adding 134 to sum 
Adding 482 to sum 
    Sum: 10133 
Workers: 39 
Average: 259.8205128205128 

編輯:只是爲了好玩,這裏是一個使用ExecutorServiceCallableFuture一個例子。

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Future; 
import java.util.concurrent.ScheduledThreadPoolExecutor; 

class Worker implements Callable<Integer> { 

    @Override 
    public Integer call() throws Exception { 
     Random random = new Random(); 

     // Sleep a random length of time, from 5-10s 
     Thread.sleep(random.nextInt(5000) + 5000); 

     // Compute x 
     int x = random.nextInt(500); 
     System.out.println("Computed " + x); 

     return x; 
    } 

} 

public class Program { 

    public static void main(String[] args) { 
     // Thread pool size 
     final int POOL_SIZE = 10; 

     // There will be 39 workers 
     final int N = 39; 

     System.out.println("Starting " + N + " workers"); 

     // Create the workers 
     Collection<Callable<Integer>> workers = new ArrayList<Callable<Integer>>(N); 

     for (int i = 0; i < N; i++) { 
      workers.add(new Worker()); 
     } 

     // Create the executor service 
     ExecutorService executor = new ScheduledThreadPoolExecutor(POOL_SIZE); 

     // Execute all the workers, wait for the results 
     List<Future<Integer>> results = null; 

     try { 
      // Executes all tasks and waits for them to finish 
      results = executor.invokeAll(workers); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
      return; 
     } 

     // Compute the sum from the results 
     int sum = 0; 

     for (Future<Integer> future : results) { 
      try { 
       sum += future.get(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); return; 
      } catch (ExecutionException e) { 
       e.printStackTrace(); return; 
      } 
     } 

     // Compute the average 
     double average = (double) sum/(double) N; 

     System.out.println("   Sum: " + sum); 
     System.out.println("  Workers: " + N); 
     System.out.println("  Average: " + average); 
    } 

} 

輸出應該是這樣的:

Starting 39 workers 
Computed 419 
Computed 36 
Computed 338 
... 
... 
... 
Computed 261 
Computed 354 
Computed 112 
     Sum: 9526 
    Workers: 39 
    Average: 244.25641025641025 
+0

令人驚歎!我會保存這個參考。爲了玩俄羅斯方塊,我編碼遺傳算法,所有的代理商必須完成他們的遊戲,以便相互配合,所以人口可以發展。每個遊戲都將運行在不同的線程中,這要感謝您的時間,這個java.util.concurrent API是一個生命保護程序! – Fernando

+0

還有一個問題:哪種方式似乎更快,CountDownLatch或ThreadedPool?還是根本沒有區別?謝謝! – Fernando

+0

在我提供的例子中,CountDownLatch速度更快。原因是在CountDownLatch示例中,所有線程都是一次生成的。在ExecutorService示例中,最多可以同時運行10個線程,因爲那是我選擇的'POOL_SIZE'。如果在ExecutorService示例中將POOL_SIZE設置爲39,則結果應該與CountDownLatch示例幾乎相同。 –

1

您可以讓以及所有相關的東西,如ThreadPools,Executors等知道。Teaser:A Future是一個帶返回值的線程。

+0

*初始化爲N的CountDownLatch可用於使一個線程等待,直到N個線程有comp說出了一些動作,或者某些動作已經完成了N次*。那正是我需要的!感謝你們! – Fernando

0

使用ExecutorService,並提交每個任務(爲Callable)將其

,你會得到一個未來提交

每個任務
List<Future<ResultType>> results = exec.invokeAll(tasks);//tasks is a set of Callable<ResultType> 
//invokeAll blocks untill all tasks are finished 
for(Future<ResultType> f:results){ 
    ResultType x=f.get();//loop over Futures to get the result 
    //do something with x 
} 
+0

哼哼比CountDownLatch的東西更簡單。期待這一點,謝謝! – Fernando