2013-08-06 83 views
1

熊與我同在,我不是在多線程編程非常精明......Java的線程池和可運行在創建可運行

我目前正在建設的是使用一個線程池的ExecutorService各種可運行的系統。這很簡單。然而,我正在研究讓runnables本身產生一個額外的runnable的可能性,這個runnable基於原始runnable中發生的事情(例如,如果成功,做到這一點,如果失敗,做到這一點等等,因爲有些任務必須先於其他任務完成執行)。應該注意的是,主線程不需要被通知這些任務的結果,儘管它可能對於處理異常很方便,即如果不能聯繫外部服務並且所有線程都因此拋出異常,則停止提交任務並定期檢查外部服務,直到恢復。這並非完全必要,但它會很好。

即,提交任務A.任務A做一些事情。如果一切順利,任務A將執行任務B.如果某些事情不能正常工作或引發異常,請執行任務C.每個子任務也可能有其他任務,但只有幾個級別深。我寧願在一個任務中做這樣的事情,而不是大型的,咆哮的條件,因爲這種方法允許更大的靈活性。

但是,我不確定這將如何影響線程池。我假設從池中的一個線程內創建的任何附加線程都將存在池外,因爲它們本身並未直接提交到池中。這是一個正確的假設嗎?如果是這樣,這可能是一個壞主意(好吧,如果不是,它可能不是一個好主意),因爲它可能會導致更多的線程,因爲原來的線程完成,並提交一個新的任務,而線程從先前的任務仍然在進行(並且可能會持續比其他任務長得多)。

我也考慮過將這些實現爲Callables,並在返回的Future中放置響應對象,然後根據響應將相應的Callable添加到線程池中。但是,這會將所有操作都綁定到主線程,這似乎是不必要的瓶頸。我想我可以將一個Runnable放入池中,該池本身處理Callable和後續動作的執行,但是然後我得到兩倍的線程數。

我在這裏的正確軌道上,還是我完全脫軌?

回答

0

有很多方法可以做你想做的。你需要小心,不要創建太多的線程。

以下是一個示例,您可以使用ExecutorCompletionService更高效,也可以使用Runnable's。

import java.util.ArrayList; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 


public class ThreadsMakeThreads { 

    public static void main(String[] args) { 
     new ThreadsMakeThreads().start(); 
    } 

    public void start() { 
     //Create resources 
     ExecutorService threadPool = Executors.newCachedThreadPool(); 
     Random random = new Random(System.currentTimeMillis()); 
     int numberOfThreads = 5; 

     //Prepare threads 
     ArrayList<Leader> leaders = new ArrayList<Leader>(); 
     for(int i=0; i < numberOfThreads; i++) { 
      leaders.add(new Leader(threadPool, random)); 
     } 

     //Get the results 
     try { 
      List<Future<Integer>> results = threadPool.invokeAll(leaders); 
      for(Future<Integer> result : results) { 
       System.out.println("Result is " + result.get()); 
      } 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
     } 

     threadPool.shutdown(); 

    } 

    class Leader implements Callable<Integer> { 

     private ExecutorService threadPool; 
     private Random random; 

     public Leader(ExecutorService threadPool, Random random) { 
      this.threadPool = threadPool; 
      this.random = random; 
     } 

     @Override 
     public Integer call() throws Exception { 
      int numberOfWorkers = random.nextInt(10); 
      ArrayList<Worker> workers = new ArrayList<Worker>(); 
      for(int i=0; i < numberOfWorkers; i++) { 
       workers.add(new Worker(random)); 
      } 
      List<Future<Integer>> tasks = threadPool.invokeAll(workers); 
      int result = 0; 
      for(Future<Integer> task : tasks) { 
       result += task.get(); 
      } 
      return result; 
     } 

    } 

    class Worker implements Callable<Integer> { 

     private Random random; 

     public Worker(Random random) { 
      this.random = random; 
     } 

     @Override 
     public Integer call() throws Exception { 
      return random.nextInt(100); 
     } 

    } 
} 
0

從其他任務提交任務到線程池是非常有意義的想法。但是我擔心你會想到在不同的線程上運行新的任務,真的可以吃掉所有的內存。只需在創建池時設置線程數的限制,並將新任務提交到該線程池。

這種方法可以在不同的方向進一步闡述。首先,使用接口方法將任務視爲普通對象,並讓該方法決定是否要將此對象提交給線程池。這要求每個任務都知道其線程池 - 在創建時將其作爲參數傳遞。更方便的是,將線程池作爲線程局部變量繼續引用。

您可以輕鬆地模擬函數式編程:對象表示函數調用,並且對於每個參數都有對應的set方法。當所有參數都被設置時,對象被提交給線程池。

另一個方向是角色編程:任務類具有單一方法,但可以多次調用,如果前一個參數尚未處理,set方法不會將任務提交給線程池,而只是存儲它的論點在隊列中。 run()方法處理來自隊列的所有可用參數,然後返回。

所有這些功能都在數據流庫https://github.com/rfqu/df4j中實現。我故意寫它支持基於任務的並行性。

+0

直接在另一個任務中執行的任務不是一項任務,它只是一種方法。重新說一下你的問題,這樣任務對於線程池(Runnable或Callable)來說意味着一件工作,然後,我可能會明白你想要什麼。 –