我想實現一個簡單的生產者 - >卡夫卡 - >消費者應用程序在Java中。我能夠成功生成並消費消息,但是當我重新啓動消費者時會出現問題,其中一些已消費的消息再次被消費者從Kafka中獲取(並非所有消息,但是最後一些消息消耗的消息)。簡單的卡夫卡消費者消息傳遞重複
我在我的消費者中設置了autooffset.reset=largest
,我的autocommit.interval.ms
屬性設置爲1000毫秒。
這是'一些已經消耗的消息的重新傳遞'的一個已知問題,或者是否有任何其他設置,我在這裏失蹤?
基本上,有沒有一種方法可以確保以前消費的消息都不會被消費者拿走/消費?
它可能聽起來很愚蠢,但如果我們說例如實現自定義提交邏輯,那麼是否可以管理每個消息的偏移量。例如,如果我有兩個帶有時間戳值的消息,那麼我想根據時間戳設置偏移量。因此,如果第二條記錄具有較早的時間戳,則分配給它的偏移量應該小於另一個。所以,當我消費時,我會收到已經排序的消息。 – user2720864 2013-09-09 06:35:48