2016-09-19 27 views
0

我正試圖在Java上實現卡夫卡使用者。卡夫卡手動偏移管理使用案例

假設消費者包含某些可能會引發異常的消息處理邏輯。在這種情況下,消費者應該睡一段時間並重新處理最後的消息。

我的想法是使用手動偏移量管理:在失敗時不提交偏移量,因此消費者可能會從舊偏移量讀取。

在測試過程中,我發現一條消息實際上只被讀取一次,儘管事實上沒有提交偏移量。上次提交的偏移量僅在應用程序重新啓動時考慮

我的問題是:

  • 無論我做對了嗎?
  • 什麼是手動偏移管理的用例?

回答

1

KafkaConsumer保持最新偏移內存,因此,如果發生異常(你歇着吧),你想讀消息的第二次,你需要輪詢前一秒鐘使用seek()時間。

提交偏移量「僅」存在,以在客戶機關閉或崩潰時保存偏移量(即偏移量可靠地存儲在內存中)。在客戶端啓動時,獲取最新的承諾偏移量,並且客戶端只使用它自己的內存偏移量。

如果您希望將偏移提交與其他一些操作「捆綁」(例如,另一個系統中的第二個「提交」必須與承諾的Kafka偏移同步),則手動偏移管理非常有用。

+0

謝謝,在另一個系統提交似乎是偉大的用例。假設它可以用來提供*一次*傳遞語義。 – Aliaxander