2014-02-07 86 views
0

我正在撰寫一項調用少數外部服務的服務。我使用期貨來表示所有這些外部服務電話的結果。我使用Guava庫提供的Futures.successfulAsList()方法將所有期貨收縮到單一的未來。關於番石榴的查詢ListenableFuture

這裏是我的代碼

List<ListenableFuture<List<T>>> futureList = new ArrayList<>(); 

for(int id: shardIds) { 
    ListeningExecutorService service = 
      (ListeningExecutorService) _shardMgr.getExecutorService(id); 
    SelectTask task = new SelectTask(_shardMgr.getReadHandle(id), sql, mapper); 


    ListenableFuture<List<T>> future = service.submit(task); 
    //Add Callback 
    Futures.addCallback(future, new ErrorCallBack(task), 
      Executors.newFixedThreadPool(1)); 
    futureList.add(future); 
} 

ListenableFuture<List<List<T>>> combinedFuture = 
     Futures.successfulAsList(futureList); 
int timeout = _dbTimeout.get(); 
List<T> selectResult = new ArrayList<T>(); 

try { 
    List<List<T>> result = combinedFuture.get(timeout, TimeUnit.MILLISECONDS); 
    for(List<T> sublist: result) { 
     for(T t : sublist) { 
      //TODO: Do we want to put a cap on how many results we return here? 
      //I think we should 
      selectResult.add(t); 
     } 
    } 
} 
catch(Exception ex) { 
    log.error("******************* Exception in parallelSelect ",ex); 
    throw new RuntimeException("Error in parallelSelect"); 
} 

當我的未來(外部服務調用)中的一個發生故障ErrorCallBack的onFailure處()被調用,但是我還是combinedFuture.get(超時,TimeUnit.MILLISECONDS)的過去;並且我在迭代結果時得到了NullPointerException(T t:sublist)...。

我想到的是,當一個外部服務調用失敗,我不應該讓過去combinedFuture.get()

我做錯什麼了嗎?我甚至試圖從ErrorCallBack的onFailure方法中拋出異常。

這裏是ErrorCallBack的實現

private class ErrorCallBack<T> implements FutureCallback<List<T>> { 
    private final SelectTask _task; 

    public ErrorCallBack(SelectTask task) { 
     _task = task; 
    } 

    @Override 
    public void onFailure(Throwable t) { 
     log.error("ErrorCallBack:onFailure(). Enter"); 
     DBErrorType type = DBErrorType.UNKNOWN; 
     try { 
      log.error("ErrorCallBack:onFailure(). Exception ",t); 
      if(t instanceof InterruptedException || t instanceof CancellationException) { 
       type = DBErrorType.UNKNOWN; 
      } else if (t instanceof SQLException || t.getCause() instanceof SQLException) { 
       type = DBErrorType.SQL_SYNTAX_ERROR; 
      } else if (t instanceof MySQLSyntaxErrorException || t.getCause() instanceof MySQLSyntaxErrorException) { 
       type = DBErrorType.SQL_SYNTAX_ERROR; 
      } else if (t instanceof ExecutionException) { 
       type = DBErrorType.SQL_SYNTAX_ERROR; 
      } else if (t instanceof TimeoutException) { 
       type = DBErrorType.NETWORK_ERROR; 
      } else { 
       type = DBErrorType.UNKNOWN; 
      } 
      ShardHandle handle = _task.getShardHandle(); 
      _shardMgr.reportException(handle, type); 
      DBException exception = new DBException(handle.getShardInfo(), type, ErrorSeverity.CRITICAL, t); 
      _alertModule.handleAlert(exception.getAlertContext()); 
     } catch(Exception ex) { 
     } 
    } 

    @Override 
    public void onSuccess(List<T> result) {} 
} 

回答

4

我想到的是,當一個外部服務調用失敗,我不應該讓過去combinedFuture.get()

哦,不,既然你」重新調用Futures.succcessfulAsList(),其名稱暗示爲returns the results of the successful Futures(對於失敗者爲null)。對於你想要的行爲,你應該打電話Futures.allAsList(),它給你一個Future,如果它的任何組件失敗失敗。

由於您沒有檢查結果中的空值,因此您將獲得NPE。

+0

非常感謝@Frank – snegi