我正在建立一個卡夫卡消費者。我已經設置了類似於下面的恢復回調。我已啓用手動提交。我如何在恢復回調方法中確認消息,以免發生延遲。春季卡夫卡消費者手動提交恢復回撥機制
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
謝謝阿爾喬姆。根據最後一條語句,我假設我能夠從retrycontext獲得確認屬性。這怎麼可能被檢索? –
'(確認)retryContext.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)' –
非常感謝! –