2013-07-16 125 views
3

我有期貨清單,在完成每個未來時,我有一個應該執行的回調。番石榴期貨等待回調

我正在使用Futures.successfulAsList檢查是否所有期貨都已完成。但是,這並沒有考慮到回調的完成。

有沒有一種方法可以確保回調完成?

而不是回調,我可以使用Futures.transform包裝到另一個未來並檢查完成。但是,有了這個,我無法訪問在包裝未來中拋出的運行時異常。

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); 

List<ListenableFuture<Object>> futures = new ArrayList<>(); 

for (int i = 1; i <= 20; i++) { 
    final int x = i * 100; 

    ListenableFuture<Object> future = service.submit(new Callable() { 
    @Override 
    public Object call() throws Exception { 
     Thread.sleep(10000/x); 

     return x; 
    } 
    }); 

    futures.add(future); 

    Futures.addCallback(future, new FutureCallback<Object>() { 

    @Override 
    public void onFailure(Throwable t) { 
     t.printStackTrace(); 
    } 

    @Override 
    public void onSuccess(Object x) { 
     try {Thread.sleep((Integer) x * 10);} catch (Exception e) {} 

     System.out.println(x); 
    } 
    }); 
} 

ListenableFuture<List<Object>> listFuture = Futures 
    .successfulAsList(futures); 
System.out.println("Waiting..."); 
System.out.println(listFuture.get()); 
System.out.println("Done"); 
+0

這是一個真正偉大的問題,因爲它在我看來,它可能是不直接由番石榴API支持的一個相當常見的情況。 –

回答

1

如果您爲每個回調創建另一個未來並確保它將在回調中完成,那麼該如何處理?

// create "callback" future here 
futures.add(callbackFuture); 

Futures.addCallback(future, new FutureCallback<Object>() { 

    @Override 
    public void onFailure(Throwable t) { 
    t.printStackTrace(); 
    // do something with callbackFuture 
    } 

    @Override 
    public void onSuccess(Object x) { 
    try {Thread.sleep((Integer) x * 10);} catch (Exception e) {} 

    System.out.println(x); 
    // do something with callbackFuture 
    } 
}); 
+0

你指的是什麼回調Future? – vinoths

+0

只是你可以在你的回調中訪問一些未來。沒什麼特別的。甚至不需要做任何事情。 (我不熟悉番石榴類;在java.util.concurrent中,你會做一個「新的FutureTask()」,其中有一個可運行的runTable並且在它的回調函數內部調用run(),之後isDone()將返回true;番石榴當然有相同的東西) – user2543253

+0

好吧,讓我試試。 – vinoths

0

謝謝,這個作品!

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); 

List<ListenableFuture<Void>> futures = new ArrayList<>(); 

for (int i = 1; i <= 20; i ++) { 
    final int x = i * 100; 

    ListenableFuture<Object> future = service.submit(new Callable(){ 
    @Override 
    public Object call() throws Exception { 
     Thread.sleep(10000/x); 

     return x; 
    } 
    }); 

    //Blank runnable to evaluate write completion 
    Runnable callback = new Runnable(){ 
    @Override 
    public void run() { 
     //do nothing 
    } 
    }; 

    final ListenableFutureTask<Void> callbackFuture = ListenableFutureTask.create(callback, null); 

    futures.add(callbackFuture); 

    Futures.addCallback(future, new FutureCallback<Object>() { 

    @Override 
    public void onFailure(Throwable t) { 
     try { 
     t.printStackTrace(); 
     } 
     finally { 
     callbackFuture.run(); 
     } 
    } 

    @Override 
    public void onSuccess(Object x) { 
     try { 
     try {Thread.sleep((Integer)x*10);}catch(Exception e){} 

     System.out.println(x); 
     } 
     finally { 
     callbackFuture.run(); 
     } 
    } 
    }); 
} 

ListenableFuture<List<Void>> listFuture = Futures.successfulAsList(futures); 
System.out.println("Waiting..."); 
System.out.println(listFuture.get()); 
System.out.println("Done"); 
4

如果你只是想阻止,直到您提交已全部完成N個任務的回調,你可以創建一個CountDownLatch與N的count然後只需調用countDown()它當每個回調完成(無論成功或失敗)和await()它在你想阻止的點。

或者,你可以不喜歡你在你的答案一樣,但不是使用ListenableFutureTask<Void>和無操作Runnable,只需用SettableFuture<Void>代替並在完成它叫set(null)

+0

我用'CountDownLatch'方法去了。很好地工作,乾淨實施! – geld0r

0

實現無眠:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); 

    List<ListenableFuture<Object>> futures = new ArrayList<>(); 

    for (int i = 1; i <= 20; i++) { 
     final int x = i * 100; 

     ListenableFuture<Object> future = service.submit(new Callable() { 
      @Override 
      public Object call() throws Exception { 
       Thread.sleep(10000/x); 

       return x; 
      } 
     }); 

     Futures.addCallback(future, new FutureCallback<Object>() { 

      @Override 
      public void onFailure(Throwable t) { 
       t.printStackTrace(); 
      } 

      @Override 
      public void onSuccess(Object x) { 
       try {Thread.sleep((Integer) x * 10);} catch (Exception e) {} 

       System.out.println(x); 
      } 
     }); 

     /* all Callbacks added in one list (ExecutionList) and executed by order. If not defined 3d argument (Executor) 
      then callbacks executed sequentially at task thread. 
     */ 
     final SettableFuture<Object> lastCalledFuture = SettableFuture.create(); 
     Futures.addCallback(future, new FutureCallback<Object>() { 
      @Override 
      public void onSuccess(Object result) { 
       lastCalledFuture.set(result); 
      } 

      @Override 
      public void onFailure(Throwable t) { 
       lastCalledFuture.setException(t); 
      } 
     }); 
     futures.add(lastCalledFuture); 
    } 

    ListenableFuture<List<Object>> listFuture = Futures 
      .successfulAsList(futures); 
    System.out.println("Waiting..."); 
    System.out.println(listFuture.get()); 
    System.out.println("Done");