2017-04-05 30 views
1

我想告訴Kafka何時我的客戶已成功處理記錄,因此我已通過將enable.auto.commit設置爲false來關閉自動提交。我在一個主題我有兩個消息在零偏移零和1訂閱,並已創建一個消費者,以便每個電話poll將返回最多一個記錄(通過將max.poll.records設置爲1)。Kafka enable.auto.commit設置爲false但輪詢仍獲取「下一個」消息

我現在打電話consumer.poll(5000)並收到第一條消息,但我不承認;我不打電話commitSynccommitAsync。如果我現在再次撥打consumer.poll(5000),使用同一個消費者,我希望得到與我剛剛閱讀的完全相同的消息,但是,我收到第二條消息。

我如何得到consumer.poll繼續發送相同的消息,直到我明確地承認它?

+0

P.S.我正在使用IBM Message Hub(在封面下使用Kafka)。 –

回答

1

你所描述的是預期的行爲。每次您撥打poll()時,它都會返回下一條消息。您提交的偏移量僅在連接新消費者時使用,因此它知道從哪裏(重新)開始。

在MessageHub中,我們將session.timeout設置爲30秒。因此,您需要稍微快點撥打poll()以避免斷開連接。如果你處理的時間比這更長的時間,那麼我能想到的2種選擇:

  • 使用卡夫卡0.10.2和設置max.poll.interval.ms要告訴你的卡夫卡客戶端會話保持活動狀態(沒有你不必調用poll()),而你處理以前的記錄。 (0.10.1中增加了此功能,但我們不支持該版本0.10.2,因爲它能夠與0.10.0中間商合作)

  • 使用seek()返回到之前的偏移量poll因此它一直返回相同的記錄。

希望這有助於!

相關問題