2012-04-13 31 views
0

我想知道是否有方法來監視線程的生命,但我會解釋我正在做的事情的上下文,所以也許有更好的方法來做到這一點。如何監控線程的生命?

基本上我有x個線程正在處理隊列並處理它,如果一個線程獲得了可接受的結果,它將進入一個解決方案隊列,否則數據將被丟棄或進一步處理。

我的問題是在我的主線程中我有一個像while(!solutions_results.isEmpty())它保存數據(現在它的打印到一個文件,但後來可能數據庫)。顯而易見的問題是,一旦清除了解決方案隊列的完成並完成工作,即使其他線程仍將數據放入隊列中。

我不知道處理這個問題的最佳方法(也許有專門的線程,只保存解決方案隊列?),但我想如果我能以某種方式監視其他線程的生活,那麼沒有更多數據進入解決方案隊列的機會。

如果有更好的方法可以做到這一點,請告訴我,否則有一種方法可以告訴其他線程完成後(我不能等待執行程序在運行此過程之前完全完成,因爲它可能會變得很大並且不希望它只是坐在內存中,理想情況下想在接近它的時候處理它,但它不依賴於時間)?

+0

這是[生產者消費者問題](http://en.wikipedia.org/wiki/Producer-consumer_problem)。也許那篇文章會給你一些見解。 – jpm 2012-04-13 17:06:51

+0

@jpm非常感謝你,我會讀它.. – 2012-04-13 17:12:36

回答

2

如果使用ExecutorService來運行你的線程的工作,那麼你可以使用awaitTermination()方法來知道什麼時候所有的線程已經完成:

ExecutorService pool = Executors.newFixedThreadPool(10); 
pool.submit(yourSolutionsRunnable); 
pool.submit(yourSolutionsRunnable); 
... 
// once you've submitted your last job you can do 
pool.shutdown(); 

然後你就可以等待所有提交到完成作業:

pool.waitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); 

如果你的線程需要保持提交他們的解決方案運行後這將變得更加複雜。如果你編輯你的問題,並讓這個更明顯,我會編輯我的答案。


編輯:

哦,我看你要處理一些沿途的結果,但不會停止,直到所有的線程完成。

您可以使用pool.isTerminated()測試,它會告訴您是否所有作業都已完成。所以,你的循環會看起來像:

// this is the main thread so waiting for solutions in a while(true) loop is ok 
while (true) { 
    // are all the workers done? 
    if (pool.isTerminated()) { 
     // if there are results process one last time 
     if (!solutions_results.isEmpty()) { 
      processTheSolutions(); 
     } 
     break; 
    } else { 
     if (solutions_results.isEmpty()) { 
      // wait a bit to not spin, you could also use a wait/notify here 
      Thread.sleep(1000); 
     } else { 
      processTheSolutions(); 
     } 
    } 
} 

編輯:

你也可以有兩個線程池,一個用於生成解決方案,另一個處理。您的主線程可能會等待工作池清空,然後等待解決方案處理池。工作人員池會將解決方案(如果有的話)提交到解決方案池中。如果需要,您可以在解決方案處理池中擁有1個線程或更多線程。

ExecutorService workerPool = Executors.newFixedThreadPool(10); 
final ExecutorService solutionsPool = Executors.newFixedThreadPool(1); 
solutionsPool.submit(workerThatPutsSolutionsIntoSolutionsPool); 
... 
// once you've submitted your last worker you can do 
workerPool.shutdown(); 

workerPool.waitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); 
// once the workers have finished you shutdown the solutions pool 
solutionsPool.shutdown(); 
// and then wait for it to finish 
solutionsPool.waitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); 
+0

但會阻止我的解決方案隊列的處理等到所有線程完成?有一種方法可以在線程處於活動狀態時保持解決方案隊列運行,因此我可以將它包裝在一個while循環中? – 2012-04-13 17:10:59

+0

灰色的權利,線程仍在執行。有兩個隊列,一個是工作隊列,另一個是解決方案隊列。只要工作隊列不是空的,我就有固定數量的線程處理它。 – 2012-04-13 17:14:42

+0

非常感謝你灰色。有效!我必須修改一個部分,如果數據集很小,並且線程在這個進程正在休眠時完成,則第二次編輯中的if(pool.isTerminated()){'會導致它退出。我將它改爲'if(executor.isTerminated()&& solutions_results.isEmpty()){'並且它似乎工作。我會測試更多,但非常感謝,這正是我所期待的。週末愉快。 – 2012-04-13 19:46:47

0

我不太瞭解你正在處理的行爲要求,但是如果你希望主線程阻塞,直到你所有的子線程完成,你應該看看Thread類的join方法。

http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/Thread.html#join()

只需運行調用你的子線程中的每一個連接方法,當它退出循環,可以確保所有線程都完成工作的主線程內循環。

+0

但是不會讓我的程序在啓動之前等待所有進程完成嗎?該程序運行很長一段時間,我寧願處理它,因爲它來。 – 2012-04-13 17:08:33

+0

啊好吧我誤解了這個問題。 – 2012-04-13 17:10:50

+0

沒問題,我正在學習新東西,所以99%的時間我不解釋這個權利。 – 2012-04-13 17:11:48

0

只需保留活動線程的列表。如果同時添加/刪除線程,您希望它同步以防止它被丟棄。或者使用類似java.util.concurrent.ConcurrentLinkedQueue的東西,它可以處理多個線程本身。當您啓動它時將每個線程添加到列表中。每個線程都應該在它停止之前從列表中移除。當列表爲空時,您的所有線程都已完成。

編輯:時間是重要的。 首先,主線程必須將工作線程放入列表中。如果他們把自己放到列表中,那麼主線程可以在某些線程已經從列表中刪除自己的時候檢查列表,並且所有其他線程雖然已經啓動,但尚未開始執行 - 因此尚未將自己置於名單。它會認爲一切都沒有完成。 二,主線程必須將每個工作線程放在的列表中,然後纔會啓動它。否則,該線程可能會完成並試圖從列表中刪除自己,之前主線程將其添加到列表中。然後,列表將永遠不會變空,程序永遠不會結束。

0

也許java.util.concurrent.ExecutorCompletionService在這裏很有用。

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.CancellationException; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

public class monitor_life_of_threads 
{ 
    /** 
    * First, convert all of your threads to instances of Callable (easy to do), and have them each return an instance some class (I'm using Integer below just as 
    * an example). 
    * This will help simplify things. 
    */ 

    public static void main (String args[]) 
    { 
     final monitor_life_of_threads worker = new monitor_life_of_threads(); 

     worker.executeCallablesAndUseResults(); 

     System.exit (0); 
    } 

    private void executeCallablesAndUseResults() 
    { 
     List < Callable <Result>> list = new ArrayList <>(); 
     populateInputList (list); 

     try 
     { 
      doWork (list); 
     } 
     catch (InterruptedException e) 
     { 
      e.printStackTrace(); 
     } 
     catch (ExecutionException e) 
     { 
      e.printStackTrace(); 
     } 
     catch (CancellationException e) 
     { 
      /* 
      * Could be called if a Callable throws an InterruptedException, and if it's not caught, it can cause Future.get to hang. 
      */ 
      e.printStackTrace(); 
     } 
     catch (Exception defaultException) 
     { 
      defaultException.printStackTrace(); 
     } 
    } 

    private void doWork (Collection < Callable <Result>> callables) throws InterruptedException, ExecutionException 
    { 
     ExecutorService executorService = Executors.newCachedThreadPool(); 

     CompletionService <Result> ecs = new ExecutorCompletionService < > (executorService); 

     for (Callable <Result> callable : callables) 
      ecs.submit (callable); 

     for (int i = 0, n = callables.size(); i < n; ++i) 
     { 
      Result r = ecs.take().get(); 
      if (r != null) 
       use (r); // This way you don't need a second queue. 
     } 

     executorService.shutdown(); 

    } 

    private void use (Result result) 
    { 
     // Write result to database, output file, etc. 

     System.out.println ("result = " + result); 
    } 

    private List < Callable <Result>> populateInputList (List < Callable <Result>> list) 
    { 
     list.add (new Callable <Result>() { 

      @Override 
      public Result call() throws Exception 
      { 
       // Do some number crunching, then return a 5. 
       return new Result (5); 
      } 

     }); 

     list.add (new Callable <Result>() { 

      @Override 
      public Result call() throws Exception 
      { 
       // Do some number crunching, then return an 8. 
       return new Result (8); 
      } 

     }); 

     list.add (new Callable <Result>() { 

      @Override 
      public Result call() throws Exception 
      { 
       // Do some number crunching, but fail and so return null. 
       return null; 
      } 

     }); 

     return list; 
    } 
} 

class Result 
{ 
    private Integer i; 

    Result (Integer i) 
    { 
     this.i = i; 
    } 

    public String toString() 
    { 
     return Integer.toString (i); 
    } 
}