2017-10-01 57 views
1

我正在使用Kafka 0.10.2.1羣集。我正在使用Kafka的offsetForTimes API來查找特定的偏移量,並希望在達到結束時間戳時跳出循環。kafka通過時間戳獲取記錄,消費者循環

我的代碼是這樣的:

//package kafka.ex.test; 

import java.util.*; 



import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; 
import org.apache.kafka.common.PartitionInfo; 
import org.apache.kafka.common.TopicPartition; 

public class ConsumerGroup { 


    public static OffsetAndTimestamp fetchOffsetByTime(KafkaConsumer<Long, String> consumer , TopicPartition partition , long startTime){ 

     Map<TopicPartition, Long> query = new HashMap<>(); 
     query.put(
       partition, 
       startTime); 

     final Map<TopicPartition, OffsetAndTimestamp> offsetResult = consumer.offsetsForTimes(query); 
     if(offsetResult == null || offsetResult.isEmpty()) { 
     System.out.println(" No Offset to Fetch "); 
     System.out.println(" Offset Size "+offsetResult.size()); 



     return null; 
     } 
     final OffsetAndTimestamp offsetTimestamp = offsetResult.get(partition); 
     if(offsetTimestamp == null){ 
     System.out.println("No Offset Found for partition : "+partition.partition()); 
     } 
     return offsetTimestamp; 
    } 

    public static KafkaConsumer<Long, String> assignOffsetToConsumer(KafkaConsumer<Long, String> consumer, String topic , long startTime){ 
     final List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); 
     System.out.println("Number of Partitions : "+partitionInfoList.size()); 
     final List<TopicPartition> topicPartitions = new ArrayList<>(); 
     for (PartitionInfo pInfo : partitionInfoList) { 
     TopicPartition partition = new TopicPartition(topic, pInfo.partition()); 
     topicPartitions.add(partition); 
     } 
     consumer.assign(topicPartitions); 
     for(TopicPartition partition : topicPartitions){ 
     OffsetAndTimestamp offSetTs = fetchOffsetByTime(consumer, partition, startTime); 


     if(offSetTs == null){ 
      System.out.println("No Offset Found for partition : " + partition.partition()); 
      consumer.seekToEnd(Arrays.asList(partition)); 
     }else { 
      System.out.println(" Offset Found for partition : " +offSetTs.offset()+" " +partition.partition()); 
      System.out.println("FETCH offset success"+ 
        " Offset " + offSetTs.offset() + 
        " offSetTs " + offSetTs); 
      consumer.seek(partition, offSetTs.offset()); 
     } 
     } 
     return consumer; 
    } 

    public static void main(String[] args) throws Exception { 


     String topic = args[0].toString(); 
     String group = args[1].toString(); 

     long start_time_Stamp = Long.parseLong(args[3].toString()); 
     String bootstrapServers = args[2].toString(); 
     long end_time_Stamp = Long.parseLong(args[4].toString()); 
     Properties props = new Properties(); 
     boolean reachedEnd = false; 

     props.put("bootstrap.servers", bootstrapServers); 
     props.put("group.id", group); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", 
     "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", 
     "org.apache.kafka.common.serialization.StringDeserializer"); 

     KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props); 
     assignOffsetToConsumer(consumer, topic, start_time_Stamp); 


     System.out.println("Subscribed to topic " + topic); 
     int i = 0; 

     int arr[] = {0,0,0,0,0}; 
     while (true) { 
     ConsumerRecords<Long, String> records = consumer.poll(6000); 
     int count= 0; 
     long lasttimestamp = 0; 
     long lastOffset = 0; 
      for (ConsumerRecord<Long, String> record : records) { 

       count++; 

       if(arr[record.partition()] == 0){ 
        arr[record.partition()] =1; 
       } 


       if (record.timestamp() >= end_time_Stamp) { 
        reachedEnd = true; 
        break; 
       } 


       System.out.println("record=>"+" offset=" 
         +record.offset() 
         + " timestamp="+record.timestamp() 
         + " :"+record); 
       System.out.println("recordcount = "+count+" bitmap"+Arrays.toString(arr)); 

      } 

     if (reachedEnd) break; 
     if (records == null || records.isEmpty()) break; // dont wait for records 
     } 

    } 



} 

我面對以下問題:

  1. consumer.poll甚至1000毫秒失敗。如果我使用1000毫秒,我必須循環輪詢幾次。我現在有一個非常大的價值。但是,已經找到了分區內的相關偏移量,如何可靠地設置輪詢超時以便數據立即返回?

  2. 我的觀察是,當數據返回它並不總是從所有分區。即使從所有分區返回數據,也不會返回所有記錄。主題中的記錄數量超過1000.但實際獲取並打印在循環中的記錄數量較少(〜200)。目前使用我的Kafka API有什麼問題嗎?

如何可靠地擺脫循環獲得開始和結束時間戳之間的所有數據,而不是過早?

回答

1
  1. 的每個輪詢提取的記錄的數量取決於你的消費配置

  2. 您正在打破循環時,其中一個分區到達endtimestamp,這是不是你想要的。您應該檢查所有分區在退出輪詢循環之前是否結束

  3. 輪詢調用是異步調用,並且獲取請求和響應是針對每個節點的,因此您可能會或可能不會在輪詢中獲取所有響應,具體取決於經紀商回覆時間