2017-09-05 35 views
1

我已經爲卡夫卡中的主題設置了TTL爲7天,我從Kafka獲取數據並將其存儲在數據庫中,但是從最近5天我的數據庫服務器已關閉,現在我必須從Kafka獲取最近5天的消息並將它們存儲在數據庫中 注意:從過去5天起,Kafka沒有問題。如何使用Java從卡夫卡獲取最近5天的消息

+0

您需要藉助偏移值進行消耗。舉個例子,如果你上一次讀取的偏移量爲100,那麼你需要從偏移量101中消耗它。 –

+0

如何在Java中使用這個偏移量概念,以及如何知道存儲消息的最後偏移值,因爲我沒有存儲任何偏移值 – Sat

回答

5

首先調用consumer.partitionsFor()方法來獲得分區你的主題

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)

然後調用consumer.offsetsForTimes()來獲得每個分區的時間戳的偏移量3天前,當最後一條消息已成功處理。

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)

然後調用consumer.seek(),以目前消費者在這一點偏移位置的時間,並繼續呼籲調查()和處理消息,你通常會。

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

1

到上一個不錯的答案,我想補充一點,通話partitionsFor方法來獲得分區你的主題,然後做的@Hans說。

+1

謝謝。我更新了我的答案,包括適當的第一步。 –