2017-04-10 79 views
1

我想從startTime讀取一個Kafka主題到endTime,在此間隔之外讀取更多的消息是可以的,但我想確定處理間隔中的所有消息。我檢查了Simple Consumer並找到getOffsetBefore(),這會在我的startTime之前給我補償。但我不確定如何在endTime之後爲每個分區獲得偏移量。請幫忙!在時間戳後獲取卡夫卡分區的偏移量

回答

0

無法保證結束時間,因爲沒有人能預見未來。

假設您知道起始偏移量並將所有數據讀取到主題末尾。仍然可能有製片人,寫一個帶有時間戳記錄的屬於你的範圍......

請注意,Kafka的記錄時間戳是元數據,因此任何記錄都可以有任何時間戳記。代理不以任何方式解釋此時間戳(只有Streams API會這樣做)。因此,卡夫卡經紀人只保證基於偏移量的消息排序,而不是基於時間戳的排序。如果記錄沒有按時間順序排列,即一個具有較大偏移量的記錄的時間戳記小於偏移量較小的記錄 - 該記錄就是所謂的「延遲記錄」(關於時間),並且存在沒有遲到的上限。

你只能決定你的商業邏輯你想讀多遠。因此,給定起始偏移量,您只需消費消息並同時監控時間戳。除此之外,您可以在看到時間戳大於間隔的第一條記錄時停止處理 - 這將是最嚴格的處理,並且不允許任何遲到的記錄。概率,你錯過了一些數據是相對較高的。

或者您應用限制較少的上限,直到您看到一個記錄的時間戳大於interval upper bound + XX是您選擇的配置參數。由於較大的X越小,你錯過任何記錄的可能性越小。

+0

感謝您的解釋。我的問題是基於這篇文章http://cfchou.github.io/blog/2015/04/23/a-closer-look-at-kafka-offsetrequest/。我如何知道消息的時間戳?我是否在消息中指定了它? – theeminence

1

下面卡夫卡消費者API可自0.10.1版本

/** 
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the 
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. 
* 
* This is a blocking call. The consumer does not have to be assigned the partitions. 
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null 
* will be returned for that partition. 
* 
* Notice that this method may block indefinitely if the partition does not exist. 
* 
* @param timestampsToSearch the mapping from partition to the timestamp to look up. 
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater 
*   than or equal to the target timestamp. {@code null} will be returned for the partition if there is no 
*   such message. 
* @throws IllegalArgumentException if the target timestamp is negative. 
*/ 
@Override 
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { 
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { 
     // we explicitly exclude the earliest and latest offset here so the timestamp in the returned 
     // OffsetAndTimestamp is always positive. 
     if (entry.getValue() < 0) 
      throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + 
        entry.getValue() + ". The target time cannot be negative."); 
    } 
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs); 
} 
+0

這實際上直到0.10.1纔可用(並且經紀人也需要至少0.10.1)。 – Thilo

+0

@Thilo是的,正確的0.10.1,謝謝糾正 –