2013-04-30 247 views
2

我想實現一個簡單的生產者 - >卡夫卡 - >消費者應用程序在Java中。我能夠成功生成並消費消息,但是當我重新啓動消費者時會出現問題,其中一些已消費的消息再次被消費者從Kafka中獲取(並非所有消息,但是最後一些消息消耗的消息)。簡單的卡夫卡消費者消息傳遞重複

我在我的消費者中設置了autooffset.reset=largest,我的autocommit.interval.ms屬性設置爲1000毫秒。

這是'一些已經消耗的消息的重新傳遞'的一個已知問題,或者是否有任何其他設置,我在這裏失蹤?

基本上,有沒有一種方法可以確保以前消費的消息都不會被消費者拿走/消費?

回答

3

卡夫卡使用Zookeeper存儲消費者偏移量。由於Zookeeper操作非常緩慢,建議在消費每條消息之後提交偏移量。

可以向使用者添加關閉掛鉤,以便在退出前手動提交主題偏移量。但是,這在某些情況下不會起作用(如jvm crash或kill -9)。爲了防止這種情況,我建議實現自定義提交邏輯,在處理每個消息(文件或本地數據庫)後在本地提交偏移量,並且每1000毫秒向Zookeeper提交偏移量。在消費者啓動時,應查詢這兩個位置,並且應將最多兩個值用作消費抵消。

+0

它可能聽起來很愚蠢,但如果我們說例如實現自定義提交邏輯,那麼是否可以管理每個消息的偏移量。例如,如果我有兩個帶有時間戳值的消息,那麼我想根據時間戳設置偏移量。因此,如果第二條記錄具有較早的時間戳,則分配給它的偏移量應該小於另一個。所以,當我消費時,我會收到已經排序的消息。 – user2720864 2013-09-09 06:35:48