2017-06-23 125 views
1

是否可以根據消息被攝取的時間段使用來自Kafka的消息?要求來自Kafka的兩個時間戳之間的消息

示例:我希望今天所有消息被攝入到0900-1000之間的一個主題(現在是1200)。

如果只有一種方法來指定開始時間,那很好 - 我的客戶可以在消息到達結束時間時停止處理消息。

我可以看到從給定偏移量請求消息,獲取第一個可用偏移量,以及獲得最早可用偏移量的方法,但不是給定時間後的所有消息。

回答

1

您可以使用offsetsForTimes方法,該方法返回其時間戳大於或等於給定時間戳的偏移量。 在這裏的官方文檔的更多信息:

https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)

得到補償,你可以尋求使用它,並開始從那裏讀之後。

+0

這聽起來非常明智!一點恥辱沒有一個用於Kafka的Node.js模塊似乎實現了這個方法:( – naxxfish

+0

Blizzard/node-rdkafka客戶端是你最好的選擇,因爲它包裝了librdkafka並支持offsetsForTimes() –