2016-09-29 175 views
0

我想獲得卡夫卡消費者的進步,即滯後。我知道下面的命令給了我時滯和其他有價值的描述。使用kafka庫查找消費者抵消滯後的代碼?

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2182 --describe --group DemoConsumer

bin/kafka-consumer-groups.sh --zookeeper localhost:2182 --describe --group DemoConsumer

我還可以得到目前消費者使用下面的代碼片段與卡夫卡的客戶端的幫助抵消

ConsumerRecords<Integer, String> records = consumer.poll(100); 
for (ConsumerRecord<Integer, String> record : records) { 

      System.out.println("Received message: (" + record.topic()+ ", 
      " + record.partition()+ ", " + record.key() + ", " + 
      record.value() + ") at offset " + record.offset()); 
} 

但我不能找到一個代碼來獲得作爲上述兩個命令的細節。有沒有任何代碼可以使用kafka庫來查找延遲和其他細節?

回答

0

根據this topic,您可以得到消費者滯後。然而,Maven的依賴關係是不對的主題,它應該是

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.12</artifactId> 
    <version>0.10.2.0</version> 
</dependency> 

而且代碼:

AdminClient client = AdminClient.createSimplePlaintext("localhost:9092"); 
Map<TopicPartition, Object> offsets = JavaConversions.asJavaMap(
client.listGroupOffsets("groupID")); 
Long offset = (Long) offsets.get(new TopicPartition("topic", 0));