回答

0

在java kafka客戶端中,有一些關於kafka消費者的方法可用於指定下一個消費位置。

公共無效尋求(TopicPartition分區, 長偏移)

覆蓋的獲取,消費者將在下一個輪詢(超時)使用偏移。如果多次爲同一分區調用此API,則將在下一個poll()中使用最新的偏移量。請注意,如果此API在消費過程中被任意使用,您可能會丟失數據,以重置提取偏移量。

這就夠了,還有seekToBeginning和seekToEnd。

+0

如果有3個分區,最新的偏移量是12,13和15,如果我們想要讀取自特定時間戳以來的所有消息,我們該如何處理? –

+0

自從時間戳以來無法讀取消息,只有偏移量。您可以讀取所有消息,然後處理您想要的消息如果消息包含時間戳記值。 – GuangshengZuo

+0

您的意思是說,閱讀每封郵件,並在我的腳本中將其與我正在查找的時間戳進行比較? –

0

我想回答一個類似但不完全相同的問題,讓我們看看我的信息是否可以幫助你。

首先,I have been working from this other SO question/answer

總之,要提交你的偏移併爲最常見的解決方案是動物園管理員。因此,如果您的客戶遇到錯誤或需要關閉,它可以從停止的地方恢復。

我自己我正在處理一個非常大的高容量流,我的消費者(用於測試)需要每次都從尾部開始。該文件表明我必須使用KafkaConsumer seek來聲明我的出發點。

我會盡力在這裏更新我的發現,一旦它們成功和可靠。肯定這是一個解決的問題。

+0

0.9以來最常見的存儲偏移量的地方是卡夫卡本身(位於__consumer_offsets主題中)。 Zookeeper僅用於舊消費者API中的偏移量。 –

相關問題