2016-09-10 35 views
0

我一直在尋找一個像這樣的情況的解決方案: 我有一個可加密的HashSet,我將這個Set提交給一個執行器並行執行。 現在我想要一旦任何提交的任務完成,我應該能夠分配一個新的Callable執行器。如何繼續向執行者提交任務

我試過這段代碼,但是如果使用executor.invoke,那麼Executor會等待,直到所有任務完成,如果我使用executor.submit,那麼任務將按順序完成。 任何幫助,將不勝感激。

package poc.threading; 

import java.util.HashSet; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ConcurrencyPoC_UsingExecutor_GetFreeThread { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 

     executor(); 
    } 


    public static void executor() 
    { 
     try{ 

      ExecutorService ex = Executors.newCachedThreadPool(); 

      //create a set with all callables task 
      HashSet<Callable<Object>> callables = new HashSet<>(); 
      callables.add(task1()); 
      callables.add(task2()); 
      callables.add(task3()); 
      callables.add(task4()); 

      //executes all task together but executor waits for completion of all tasks 

      List<Future<Object>> fu = ex.invokeAll(callables); 
      for(int i=0; i<fu.size(); i++) 
      { 
       System.out.println(fu.get(i).get() + " , " + Thread.currentThread().getName().toString()); 
      } 

      //executes tasks sequentially 
      for(Callable<Object> task : callables) 
      { 
       Future<Object> future = ex.submit(task); 
       System.out.println(future.get() + " , " + Thread.currentThread().getName().toString()); 
      } 
      ex.shutdownNow(); 
     } 
     catch(Exception e) 
     { 
      e.printStackTrace(); 
     } 
    } 


    public static Callable<Object> task1() throws InterruptedException 
    { 
     return new Callable<Object>() { 

      @Override 
      public Object call() throws Exception { 

       int count = 0; 
       while(count < 3) 
       { 
        System.out.println("****** SLEEP TASK1 ******* "+count); 
        Thread.sleep(500); 
        count ++; 
       } 
       return "Sleep Task Of 500 Completed"; 
      } 
     }; 
    } 


    public static Callable<Object> task2() throws InterruptedException 
    { 
     return new Callable<Object>() { 

      @Override 
      public Object call() throws Exception { 

       int count = 0; 
       while(count < 6) 
       { 
        System.out.println("****** SLEEP TASK2 ******* "+count); 
        Thread.sleep(300); 
        count ++; 
       } 
       return "Sleep Task Of 300 Completed"; 
      } 
     }; 
    } 


    public static Callable<Object> task3() throws InterruptedException 
    { 
     return new Callable<Object>() { 

      @Override 
      public Object call() throws Exception { 

       int count = 0; 
       while(count < 2) 
       { 
        System.out.println("****** SLEEP TASK3 ******* "+count); 
        Thread.sleep(1000); 
        count ++; 
       } 
       return "Sleep Task Of 1000 Completed"; 
      } 
     }; 
    } 

    public static Callable<Object> task4() throws InterruptedException 
    { 
     return new Callable<Object>() { 

      @Override 
      public Object call() throws Exception { 

       int count = 0; 
       while(count < 4) 
       { 
        System.out.println("****** SLEEP TASK4 ******* "+count); 
        Thread.sleep(600); 
        count ++; 
       } 
       return "Sleep Task Of 1000 Completed"; 
      } 
     }; 
    } 
} 
+0

問題對我來說還不清楚。如果要通知代碼的某個部分某個任務已完成,則可以使用觀察者模式並向該任務添加一個偵聽器。 –

回答

1

你的任務是在第二個例子中運行順序的原因是因爲你對未來的呼喚get()你後續任務調用submit()之前。如果您在get之前完成所有submit s,那麼它們將並行運行。

如果您正在尋找相互依賴的任務,請參閱CompletableFuture課程。這種類型的未來將允許您在第一次開始後開始另一項任務:

CompletableFuture<Object> task1 = CompletableFuture.supplyAsync(() -> task1(), ex); 
CompletableFuture<Object> task2 = task1.thenApplyAsync(task1Result -> task2(), ex);