我知道關於配置kafka從最早或最新的消息讀取。 如果我需要讀取前一個偏移量,我們如何包含一個附加選項? 我需要這樣做的原因是由於之前處理邏輯中的某些錯誤,所讀取的較早消息需要再次處理。如何配置kafka,以便我們可以選擇從最早,最新以及從任何給定偏移量進行讀取?
0
A
回答
0
在java kafka客戶端中,有一些關於kafka消費者的方法可用於指定下一個消費位置。
公共無效尋求(TopicPartition分區, 長偏移)
覆蓋的獲取,消費者將在下一個輪詢(超時)使用偏移。如果多次爲同一分區調用此API,則將在下一個poll()中使用最新的偏移量。請注意,如果此API在消費過程中被任意使用,您可能會丟失數據,以重置提取偏移量。
這就夠了,還有seekToBeginning和seekToEnd。
0
我想回答一個類似但不完全相同的問題,讓我們看看我的信息是否可以幫助你。
首先,I have been working from this other SO question/answer
總之,要提交你的偏移併爲最常見的解決方案是動物園管理員。因此,如果您的客戶遇到錯誤或需要關閉,它可以從停止的地方恢復。
我自己我正在處理一個非常大的高容量流,我的消費者(用於測試)需要每次都從尾部開始。該文件表明我必須使用KafkaConsumer seek來聲明我的出發點。
我會盡力在這裏更新我的發現,一旦它們成功和可靠。肯定這是一個解決的問題。
+0
0.9以來最常見的存儲偏移量的地方是卡夫卡本身(位於__consumer_offsets主題中)。 Zookeeper僅用於舊消費者API中的偏移量。 –
相關問題
- 1. 如何獲得kafka的最新偏移
- 2. 我如何獲得分類學類別,以便他們在wordpress中的發佈日期從最新到最早
- 3. Storm-kafka 0.8 plus,我可以從最新的膠印中讀取嗎?
- 4. 如何配置jEdit以選擇$以及變量名稱
- 5. 循環問題以及如何從給定值中選擇最接近的值
- 6. jquery可以涉及哪些最早和最新的事件?
- 7. 如何編輯jquery datepicker以便它可以從當天選擇?
- 8. 如何從行中選擇最後24小時的行偏移
- 9. 如何我可以選擇從Oracle表
- 10. 如何選擇功能以及從表
- 11. 如何從FlinkKafkaConsumer獲取最新的消息偏移量?
- 12. 我可以選擇向Azure功能提供配置設置以及如何保護它們?
- 13. 我可以從Unix命令行獲得UTC偏移量嗎?
- 14. 如何獲取JSON響應,以便我可以提醒()以進行調試
- 15. 設置CIDR/IP以便任何人都可以從任何IP訪問它?
- 16. 如何獲得Confluent kafka C#庫中Kafka主題的最新偏移量?
- 17. 如何定義下拉選項,以便我們可以將其分配給另一個
- 18. 如何以及從DataGridView插入選定行的位置?
- 19. flink如何從kafka中讀取最新數據
- 20. Java:當我開始從kafka話題讀取時如何從當前偏移量讀取
- 21. 如何獲取UDID以進行配置?
- 22. 我可以使用out-gridview從選定行分配變量-passthru
- 23. 如何從mysql表讀取動態變量名稱和值,以便它們可以在SOAPUI中使用請求
- 24. 如何對URL進行編碼以便我可以將它傳遞給Facebook Share?
- 25. 如何鎖定表進行讀取和寫入,以便我可以執行SQL,然後再刪除鎖?
- 26. 如何配置ObjectDataSource以從ListView中選擇行
- 27. 如何配置SOLR,以便用戶可以默認進行前綴搜索?
- 28. 如何計算基於xy座標的偏移量以從numpy.memmap中讀取?
- 29. 在Kafka中讀取偏移量
- 30. Hibernate何時從二級緩存讀取以及從DB讀取?
如果有3個分區,最新的偏移量是12,13和15,如果我們想要讀取自特定時間戳以來的所有消息,我們該如何處理? –
自從時間戳以來無法讀取消息,只有偏移量。您可以讀取所有消息,然後處理您想要的消息如果消息包含時間戳記值。 – GuangshengZuo
您的意思是說,閱讀每封郵件,並在我的腳本中將其與我正在查找的時間戳進行比較? –