2009-02-11 73 views
37

首先,我必須說我對API java.util.concurrent很陌生,所以也許我所做的是完全錯誤的。這是使用java.util.concurrent.FutureTask的好方法嗎?

我想要做什麼?

我有一個基本上運行2分別處理(稱爲myFirstProcessmySecondProcess)的Java應用程序,但這些處理必須在同一時間運行。

所以,我想這樣做:

public void startMyApplication() { 
    ExecutorService executor = Executors.newFixedThreadPool(2); 
    FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess); 
    FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess); 
    executor.execute(futureOne); 
    executor.execute(futureTwo); 
    while (!(futureOne.isDone() && futureTwo.isDone())) { 
     try { 
      // I wait until both processes are finished. 
      Thread.sleep(1000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
    logger.info("Processing finished"); 
    executor.shutdown(); 
    // Do some processing on results 
    ... 
} 

myFirstProcessmySecondProcess是實現Callable<Object>類,並在所有的處理是在call()方法制成。

它工作得很好,但我不確定這是否正確。 是做我想做的事的好方法嗎?如果沒有,你可以給我一些提示,以加強我的代碼(並儘可能保持簡單)。

+3

你的條件是缺少一個閉括號:) – 2009-02-11 11:32:35

回答

43

您最好使用get()方法。

futureOne.get(); 
futureTwo.get(); 

這兩者的等待線程,它完成處理的通知,這樣可以節省你的忙 - 等待 - 與 - 計時器您現在使用效率不高也不優雅。

作爲獎勵,您擁有API get(long timeout, TimeUnit unit),它允許您定義線程休眠和等待響應的最長時間,否則將繼續運行。

查看Java API瞭解更多信息。

+0

因此,而不是Callable他應該實現未來或他應該實現呢? – 2009-05-19 13:08:03

+1

對不起,我被你的鏈接弄糊塗了。 FutureTask有.get()方法。 – 2009-05-19 13:09:21

+1

有關使用CompletionService的答案比這個更好。 – 2015-01-24 20:11:35

5

如果您有興趣同時啓動線程,或者等待它們完成並進行一些進一步處理,則可能需要使用CyclicBarrier。 查看javadoc瞭解更多信息。

17

Yuval的解決方案很好。作爲替代,您也可以這樣做:

ExecutorService executor = Executors.newFixedThreadPool(); 
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess); 
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess); 
executor.execute(futureOne); 
executor.execute(futureTwo); 
executor.shutdown(); 
try { 
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
} catch (InterruptedException e) { 
    // interrupted 
} 

這種方法的優點是什麼?除了這種方式,你可以阻止執行者接受任何更多的任務(你也可以這樣做),除此之外沒有什麼區別。雖然我傾向於喜歡這個成語。另外,如果get()拋出一個異常,你最終可能會在你的代碼的一部分中假設這兩個任務都完成了,這可能是不好的。

18

以上FutureTask的使用是可以忍受的,但絕對不是慣用的。您實際上正在將您提交給ExecutorService的那件包裹額外FutureTask。您的FutureTaskExecutorService視爲Runnable。在內部,它將FutureTask-as- Runnable包裝在新的FutureTask中,並將其作爲Future<?>返回給您。

相反,您應該將Callable<Object>實例提交到CompletionService。您通過submit(Callable<V>)放下兩個Callable,然後轉身並撥打CompletionService#take()兩次(每次提交一次Callable)。這些電話會阻塞,直到其中一個提交的任務完成。

鑑於您手頭已有Executor,請圍繞它構建一個新的ExecutorCompletionService並將您的任務放在那裏。不要旋轉和睡覺等待; CompletionService#take()將會阻塞,直到任一項任務完成(無論是完成運行還是取消)或等待take()的線程中斷。

7

可以使用的invokeAll(Colelction ....)方法

package concurrent.threadPool; 

import java.util.Arrays; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class InvokeAll { 

    public static void main(String[] args) throws Exception { 
     ExecutorService service = Executors.newFixedThreadPool(5); 
     List<Future<java.lang.String>> futureList = service.invokeAll(Arrays.asList(new Task1<String>(),new Task2<String>())); 

     System.out.println(futureList.get(1).get()); 
     System.out.println(futureList.get(0).get()); 
    } 

    private static class Task1<String> implements Callable<String>{ 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000 * 10); 
      return (String) "1000 * 5"; 
     } 

    } 

    private static class Task2<String> implements Callable<String>{ 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000 * 2); 
      int i=3; 
      if(i==3) 
       throw new RuntimeException("Its Wrong"); 
      return (String) "1000 * 2"; 
     } 

    } 
} 
2

如果您futureTasks是超過2,請考慮[ListenableFuture][1]

當幾個操作應儘快開始爲另一個操作 開始 - 「扇出」 - ListenableFuture只是工作:它觸發所有的 請求的回調。隨着稍微更多的工作,我們可以「扇入式」或 觸發ListenableFuture在其他幾個 期貨全部完成後立即計算。

相關問題