我已經爲卡夫卡中的主題設置了TTL爲7天,我從Kafka
獲取數據並將其存儲在數據庫中,但是從最近5天我的數據庫服務器已關閉,現在我必須從Kafka
獲取最近5天的消息並將它們存儲在數據庫中 注意:從過去5天起,Kafka
沒有問題。如何使用Java從卡夫卡獲取最近5天的消息
1
A
回答
5
首先調用consumer.partitionsFor()方法來獲得分區你的主題
然後調用consumer.offsetsForTimes()來獲得每個分區的時間戳的偏移量3天前,當最後一條消息已成功處理。
然後調用consumer.seek(),以目前消費者在這一點偏移位置的時間,並繼續呼籲調查()和處理消息,你通常會。
1
到上一個不錯的答案,我想補充一點,通話partitionsFor
方法來獲得分區你的主題,然後做的@Hans說。
+1
謝謝。我更新了我的答案,包括適當的第一步。 –
相關問題
- 1. 卡夫卡0.9:消費從最早的卡夫卡偏移
- 2. 查詢卡夫卡消息
- 3. 卡夫卡消息給websocket
- 4. 春天卡夫卡不支持大消息的消費者
- 5. 卡夫卡消耗相反的消息
- 6. 在卡夫卡消費活的消息
- 7. 卡夫卡消費者如何選擇消費最近的經紀商?
- 8. 卡夫卡延遲消息消耗
- 9. 的Java Vs的星火消耗卡夫卡消息
- 10. 獲取卡夫卡消費者名單的Java api
- 11. 消費消費使用卡夫卡消費者 - Java
- 12. 如何從卡夫卡獲得鳴謝
- 13. Spark結構化流只從卡夫卡的一個分區獲取消息
- 14. 卡夫卡生產者跳過消息
- 15. 解密卡夫卡Avro消息
- 16. 卡夫卡製作人消息流
- 17. 如何從卡夫卡用卡夫卡流通過間隔
- 18. 如何從一個Java servlet通過Kafka.Producer將消息發送到卡夫卡
- 19. 如何在Java中的消費羣獲取ConsumerOffset(存儲在卡夫卡)?
- 20. 卡夫卡python消費者開始時讀取所有消息
- 21. 向卡夫卡發佈消息的最佳方式是什麼?
- 22. 無法從卡夫卡主題讀取消息使用Spark Streaming Kafka
- 23. 如何從卡夫卡隊列直接從卡夫卡隊列讀取數據
- 24. 獲取從卡夫卡消費控制檯腳本
- 25. 卡夫卡Procuder不使用SSL生成SSL上的消息
- 26. 卡夫卡消費者 - Java客戶端
- 27. 卡夫卡消費者與JAVA
- 28. 如何從生產者消費卡夫卡的消費者?
- 29. 如何檢索卡夫卡消息的特定計數主題
- 30. 如何從卡夫卡的舊偏移點獲取數據?
您需要藉助偏移值進行消耗。舉個例子,如果你上一次讀取的偏移量爲100,那麼你需要從偏移量101中消耗它。 –
如何在Java中使用這個偏移量概念,以及如何知道存儲消息的最後偏移值,因爲我沒有存儲任何偏移值 – Sat