0
,當我使用bulkProcessor在ElasticSearch插入/更新散貨。 我想趕捕捉Elasticsearch散裝錯誤使用bulkProcessor
- EsRejectedExecutionException
- VersionConflictEngineException
- DocumentAlreadyExistsException
,但它不扔東西。 它只在響應項目上設置消息。 我該如何正確處理它?例如應用型重試,如果被拒絕......發生網絡故障,只有當
public BulkResponse response bulkUpdate(.....) {
BulkResponse bulkWriteResult = null;
long startTime = System.currentTimeMillis();
AtomicInteger amountOfRequests = new AtomicInteger();
long esTime;
ElasticBulkProcessorListener listener = new ElasticBulkProcessorListener(updateOperations);
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener)
.setBulkActions(MAX_BULK_ACTIONS)
.setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB))
.setConcurrentRequests(5)
.build();
updateOperations.forEach(updateRequest -> {
bulkProcessor.add(updateRequest);
amountOfRequests.getAndIncrement();
});
try {
boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS);
if (isFinished) {
if (listener.getBulkWriteResult() != null) {
bulkWriteResult = listener.getBulkWriteResult();
} else {
throw new Exception("Bulk updating failed, results are empty");
}
} else {
throw new Exception("Bulk updating failed, received timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return bulkWriteResult;
}
public class ElasticBulkProcessorListener implements BulkProcessor.Listener {
private long esTime = 0;
private List<Throwable> errors;
private BulkResponse response;
public long getEsTime() {
return esTime;
}
@Override
public void beforeBulk(long executionId, BulkRequest request) {
String description = "";
if (!request.requests().isEmpty()) {
ActionRequest request1 = request.requests().get(0);
description = ((UpdateRequest) request1).type();
}
log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}",
executionId, (request.estimatedSizeInBytes()/1000000), request.numberOfActions(), description);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length);
esTime = response.getTookInMillis();
response = createBulkUpdateResult(response);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Bulk , failed! error: ", executionId, failure);
throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure);
}
}