我正在撰寫一項調用少數外部服務的服務。我使用期貨來表示所有這些外部服務電話的結果。我使用Guava庫提供的Futures.successfulAsList()方法將所有期貨收縮到單一的未來。關於番石榴的查詢ListenableFuture
這裏是我的代碼
List<ListenableFuture<List<T>>> futureList = new ArrayList<>();
for(int id: shardIds) {
ListeningExecutorService service =
(ListeningExecutorService) _shardMgr.getExecutorService(id);
SelectTask task = new SelectTask(_shardMgr.getReadHandle(id), sql, mapper);
ListenableFuture<List<T>> future = service.submit(task);
//Add Callback
Futures.addCallback(future, new ErrorCallBack(task),
Executors.newFixedThreadPool(1));
futureList.add(future);
}
ListenableFuture<List<List<T>>> combinedFuture =
Futures.successfulAsList(futureList);
int timeout = _dbTimeout.get();
List<T> selectResult = new ArrayList<T>();
try {
List<List<T>> result = combinedFuture.get(timeout, TimeUnit.MILLISECONDS);
for(List<T> sublist: result) {
for(T t : sublist) {
//TODO: Do we want to put a cap on how many results we return here?
//I think we should
selectResult.add(t);
}
}
}
catch(Exception ex) {
log.error("******************* Exception in parallelSelect ",ex);
throw new RuntimeException("Error in parallelSelect");
}
當我的未來(外部服務調用)中的一個發生故障ErrorCallBack的onFailure處()被調用,但是我還是combinedFuture.get(超時,TimeUnit.MILLISECONDS)的過去;並且我在迭代結果時得到了NullPointerException(T t:sublist)...。
我想到的是,當一個外部服務調用失敗,我不應該讓過去combinedFuture.get()
我做錯什麼了嗎?我甚至試圖從ErrorCallBack的onFailure方法中拋出異常。
這裏是ErrorCallBack的實現
private class ErrorCallBack<T> implements FutureCallback<List<T>> {
private final SelectTask _task;
public ErrorCallBack(SelectTask task) {
_task = task;
}
@Override
public void onFailure(Throwable t) {
log.error("ErrorCallBack:onFailure(). Enter");
DBErrorType type = DBErrorType.UNKNOWN;
try {
log.error("ErrorCallBack:onFailure(). Exception ",t);
if(t instanceof InterruptedException || t instanceof CancellationException) {
type = DBErrorType.UNKNOWN;
} else if (t instanceof SQLException || t.getCause() instanceof SQLException) {
type = DBErrorType.SQL_SYNTAX_ERROR;
} else if (t instanceof MySQLSyntaxErrorException || t.getCause() instanceof MySQLSyntaxErrorException) {
type = DBErrorType.SQL_SYNTAX_ERROR;
} else if (t instanceof ExecutionException) {
type = DBErrorType.SQL_SYNTAX_ERROR;
} else if (t instanceof TimeoutException) {
type = DBErrorType.NETWORK_ERROR;
} else {
type = DBErrorType.UNKNOWN;
}
ShardHandle handle = _task.getShardHandle();
_shardMgr.reportException(handle, type);
DBException exception = new DBException(handle.getShardInfo(), type, ErrorSeverity.CRITICAL, t);
_alertModule.handleAlert(exception.getAlertContext());
} catch(Exception ex) {
}
}
@Override
public void onSuccess(List<T> result) {}
}
非常感謝@Frank – snegi