在同一頁面
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
它說,尋找偏移讀取
卡夫卡包括兩個常量的幫助,kafka.api.OffsetRequest.EarliestTime()找到數據的開始在日誌中並從那裏開始流式傳輸,kafka.api.OffsetRequest.LatestTime()將只傳輸新消息。不要認爲偏移量0是開始偏移量,因爲消息隨時間超出日誌。
答案有點不相關,因爲OP描述的情況與0無關,因爲它是起始偏移量 - 它是壓縮消息的怪癖。 –