我想從startTime讀取一個Kafka主題到endTime,在此間隔之外讀取更多的消息是可以的,但我想確定處理間隔中的所有消息。我檢查了Simple Consumer並找到getOffsetBefore(),這會在我的startTime之前給我補償。但我不確定如何在endTime之後爲每個分區獲得偏移量。請幫忙!在時間戳後獲取卡夫卡分區的偏移量
請注意,Kafka的記錄時間戳是元數據,因此任何記錄都可以有任何時間戳記。代理不以任何方式解釋此時間戳(只有Streams API會這樣做)。因此,卡夫卡經紀人只保證基於偏移量的消息排序,而不是基於時間戳的排序。如果記錄沒有按時間順序排列,即一個具有較大偏移量的記錄的時間戳記小於偏移量較小的記錄 - 該記錄就是所謂的「延遲記錄」(關於時間),並且存在沒有遲到的上限。
你只能決定你的商業邏輯你想讀多遠。因此,給定起始偏移量,您只需消費消息並同時監控時間戳。除此之外,您可以在看到時間戳大於間隔的第一條記錄時停止處理 - 這將是最嚴格的處理,並且不允許任何遲到的記錄。概率,你錯過了一些數據是相對較高的。
或者您應用限制較少的上限,直到您看到一個記錄的時間戳大於interval upper bound + X
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
* This is a blocking call. The consumer does not have to be assigned the partitions.
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
* will be returned for that partition.
* Notice that this method may block indefinitely if the partition does not exist.
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws IllegalArgumentException if the target timestamp is negative.
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// we explicitly exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
這實際上直到0.10.1纔可用(並且經紀人也需要至少0.10.1)。 – Thilo
@Thilo是的,正確的0.10.1,謝謝糾正 –
- 1. 通過了解卡夫卡的分區和偏移量來獲取消息
- 2. 如何在卡夫卡0.10中找到主題分區的偏移量範圍?
- 3. 卡夫卡保留期後的偏移量
- 4. 卡夫卡0.9:消費從最早的卡夫卡偏移
- 5. 卡夫卡製片人時間戳
- 6. 卡夫卡分區的每個偏移量中存儲了多少條記錄?
- 7. 如何從卡夫卡的舊偏移點獲取數據?
- 8. 卡夫卡不保存偏移正確
- 9. 春季卡夫卡分區
- 10. 獲取時區偏移量
- 11. 卡夫卡:如何在卡夫卡實現循環分區
- 12. 卡夫卡消費者中的控制消息偏移量
- 13. 重置卡夫卡消費者的上一次偏移量
- 14. 卡夫卡從相同的偏移量重新啓動
- 15. 卡夫卡0.11如何重置偏移量
- 16. 從卡夫卡流重置消費者偏移量
- 17. 獲取時區的時間偏移
- 18. 卡夫卡時光倒流使用偏移
- 19. 的NodeJS和卡夫卡鍵分區
- 20. 重新部署SCDF流時,卡夫卡偏移的劑量如何改變?
- 21. 在Python中獲取特定時間和時區的偏移量
- 22. 使用駱駝卡夫卡時是否可以訪問卡夫卡分區的數量?
- 23. 卡夫卡 - 動態/任意分區
- 24. 卡夫卡流:多主題分區
- 25. 卡夫卡 - 檢查每一個分區
- 26. 卡夫卡消費羣體和分區
- 27. 阿帕奇卡夫卡 - 分區
- 28. 計算卡夫卡主題分區
- 29. 卡夫卡主題分區火花流
- 30. 卡夫卡多分區訂購
感謝您的解釋。我的問題是基於這篇文章http://cfchou.github.io/blog/2015/04/23/a-closer-look-at-kafka-offsetrequest/。我如何知道消息的時間戳?我是否在消息中指定了它? – theeminence