1
我正在使用spring-integration-kafka
實施具有自定義確認機制的卡夫卡消費者。使用this example的代碼。通過拒絕確認重新讀取來自卡夫卡主題的消息
我想要實現的是當拋出異常時,確認不應該發回給Kafka(即不應該執行偏移提交),因此下一個fromKafka.receive(10000)
方法調用將返回與先前相同的消息一。
但是我面臨一個問題:即使確認沒有發送到Kafka,消費者也知道下一個消息的偏移量,並繼續讀取新的消息,儘管偏移量主題中的偏移量仍然存在不變。
如何在發生某些故障時讓消費者重讀消息?
謝謝,我會嘗試'ErrorHandler'的方式。但無論如何,適配器如何知道下一條消息的偏移量?它是否在內部存儲當前的偏移量? – Aliaxander