2017-03-01 44 views
0

,當我使用bulkProcessor在ElasticSearch插入/更新散貨。 我想趕捕捉Elasticsearch散裝錯誤使用bulkProcessor

  • EsRejectedExecutionException
  • VersionConflictEngineException
  • DocumentAlreadyExistsException

,但它不扔東西。 它只在響應項目上設置消息。 我該如何正確處理它?例如應用型重試,如果被拒絕......發生網絡故障,只有當

public BulkResponse response bulkUpdate(.....) { 
    BulkResponse bulkWriteResult = null; 
    long startTime = System.currentTimeMillis(); 
    AtomicInteger amountOfRequests = new AtomicInteger(); 
    long esTime; 


    ElasticBulkProcessorListener listener = new ElasticBulkProcessorListener(updateOperations); 
    BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener) 
     .setBulkActions(MAX_BULK_ACTIONS) 
     .setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB)) 
     .setConcurrentRequests(5) 
     .build(); 


    updateOperations.forEach(updateRequest -> { 
     bulkProcessor.add(updateRequest); 
     amountOfRequests.getAndIncrement(); 
    }); 

try { 
    boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS); 
    if (isFinished) { 
     if (listener.getBulkWriteResult() != null) { 
      bulkWriteResult = listener.getBulkWriteResult(); 
     } else { 
      throw new Exception("Bulk updating failed, results are empty"); 
     } 
    } else { 
     throw new Exception("Bulk updating failed, received timeout"); 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

return bulkWriteResult; 
} 


public class ElasticBulkProcessorListener implements BulkProcessor.Listener { 
private long esTime = 0; 
private List<Throwable> errors; 
private BulkResponse response; 

public long getEsTime() { 
    return esTime; 
} 

@Override 
public void beforeBulk(long executionId, BulkRequest request) { 
    String description = ""; 
    if (!request.requests().isEmpty()) { 
     ActionRequest request1 = request.requests().get(0); 
     description = ((UpdateRequest) request1).type(); 
    } 

    log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}", 
      executionId, (request.estimatedSizeInBytes()/1000000), request.numberOfActions(), description); 
} 

@Override 
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
    log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length); 
    esTime = response.getTookInMillis(); 
    response = createBulkUpdateResult(response); 
} 

@Override 
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
    log.error("Bulk , failed! error: ", executionId, failure); 
    throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure); 
} 

}

回答

0

故障處理程序將被調用, 任何其他情況下會得到成功處理程序。

處理異常正如我上面提到,通過分析每個響應項目,並弄清楚發生了什麼事的唯一途徑。