我想實現的場景是從卡夫卡消費一條消息,處理它,如果某些條件失敗,我不想確認消息。對於我在春季雲流參考文檔中找到的,手動確認消息:春雲流Kafka
autoCommitOffset 是否在處理消息時自動提交偏移量。如果設置爲false,則將在消息標題中提供一個確認標題以用於延遲確認。
默認值:true。
我的問題是將autoCommitOffset設置爲false後,我如何確認一條消息?一個代碼示例將非常感謝。
我想實現的場景是從卡夫卡消費一條消息,處理它,如果某些條件失敗,我不想確認消息。對於我在春季雲流參考文檔中找到的,手動確認消息:春雲流Kafka
autoCommitOffset 是否在處理消息時自動提交偏移量。如果設置爲false,則將在消息標題中提供一個確認標題以用於延遲確認。
默認值:true。
我的問題是將autoCommitOffset設置爲false後,我如何確認一條消息?一個代碼示例將非常感謝。
我提供了一個答案,這裏的問題https://github.com/spring-cloud/spring-cloud-stream/issues/575
基本上可以歸結爲設置 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
,然後辦理確認頭:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
謝謝Marius。另外,請分享導入聲明以瞭解確認類的API。希望看看是否有方法不承認消息。 –
這就是答案。謝謝Marius! –
我不知道什麼是「斯普林特雲流卡夫卡「是。但是,如果您使用常規的Kafka客戶端,它應該提供一種名爲「commit()」的方法或手動提交偏移量的方法。 –