2015-10-21 40 views
1

我正在使用spring-integration-kafka實施具有自定義確認機制的卡夫卡消費者。使用this example的代碼。通過拒絕確認重新讀取來自卡夫卡主題的消息

我想要實現的是當拋出異常時,確認不應該發回給Kafka(即不應該執行偏移提交),因此下一個fromKafka.receive(10000)方法調用將返回與先前相同的消息一。

但是我面臨一個問題:即使確認沒有發送到Kafka,消費者也知道下一個消息的偏移量,並繼續讀取新的消息,儘管偏移量主題中的偏移量仍然存在不變。

如何在發生某些故障時讓消費者重讀消息?

回答

1

有目前未重新獲取失敗消息的任何支持

一兩件事你可以做的就是添加重試(例如,使用請求處理程序重試諮詢)消息驅動適配器的下游。

通過不反應,消息將在重新啓動後傳遞,但不會在當前實例化期間傳遞。因爲消息被預取到適配器中,所以你可以做的一件事是檢測失敗,停止適配器,排出預取的消息並重新啓動。

您可以注入一個自定義ErrorHandler來停止適配器,並向您的下游流發送信號,表明它應該忽略排水消息。

+0

謝謝,我會嘗試'ErrorHandler'的方式。但無論如何,適配器如何知道下一條消息的偏移量?它是否在內部存儲當前的偏移量? – Aliaxander