幾次我用Spring Kafka API實現卡夫卡消費者與手動偏移管理:閱讀相同的消息從卡夫卡
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
在這裏,我想消費者承諾只有當someCondition
持有偏移。否則,消費者應該睡一段時間,並再次讀取相同的消息。
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
當前的配置,如果someCondition == false
,消費者不提交的偏移量,但仍讀取下一個消息。如果卡夫卡acknowledgement
未執行,是否有方法讓消費者重新閱讀消息?
謝謝。現在我懷疑我是否使用手動偏移管理來實現其預期目的。似乎應該使用功能在**使用者失敗的情況下重新讀取消息**,而不是在使用消息的組件失敗時使用。 – Aliaxander
嗨@Gary Russel,你知道是否存在一些關於使用Spring Boot尋找的例子。我正在嘗試一些類似的東西來聽一個主題,但審查分區中的消息響應:https://stackoverflow.com/questions/48349993/how-to-listen-for-the-right-ack-message- from-kafka – jabrena
尋求並不是該用例的正確解決方案;在那邊看我的回答。 –