我在Apache Kafka
上構建了排隊系統。應用程序將產生消息到特定的Kafka topic
,並且在消費者端,我必須消耗爲該主題產生的所有記錄。
我寫消費者使用新的Java Consumer Api。 代碼看起來像永久運行kafka消費者(新消費者API)
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
在這裏,我要永遠跑消費者,這樣的紀錄推到卡夫卡的話題生產者應立即使用並處理。
所以我的困惑是,它是一個正確的方式來使用無限的while循環(如在示例代碼中)來消費數據?