2017-07-25 251 views
1

如何計算在卡夫卡發送的主題消息總數有多少,以及消費者當時消費或承諾了多少消息?卡夫卡消費者/生產者API

我initiatting卡夫卡連接器AS-

Map<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", "localhost:9092"); 
Set<String> topics = Collections.singleton("mytopic"); 

JavaPairInputDStream<String, String> directKafkaStream = 
KafkaUtils.createDirectStream(ssc, 
    String.class, String.class, StringDecoder.class, StringDecoder.class, 
    kafkaParams, topics); 

加工爲 -

directKafkaStream.foreachRDD(rdd -> { 
System.out.println("--- New RDD with " + rdd.partitions().size() 
     + " partitions and " + rdd.count() + " records"); 
rdd.foreach(record -> System.out.println(record._2)); 
}); 

FOR 2秒鐘

--- New RDD with 2 partitions and 3 records 
value-1 
value-0 
value-2 
--- New RDD with 2 partitions and 7 records 
value-3 
value-5 
value-7 
value-9 
value-4 
value-6 
value-8 
--- New RDD with 2 partitions and 8 records 
value-11 
value-10 
value-13 
... 

回答

2

你可以試試下面執行命令:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1 

然後,總結每個分區的所有計數。