2016-06-29 264 views
0

我在Apache Kafka上構建了排隊系統。應用程序將產生消息到特定的Kafka topic,並且在消費者端,我必須消耗爲該主題產生的所有記錄。
我寫消費者使用新的Java Consumer Api。 代碼看起來像永久運行kafka消費者(新消費者API)

Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBrokerIp+":9092"); 
        props.put("group.id",groupId); 
        props.put("enable.auto.commit", "true"); 
        props.put("session.timeout.ms", "30000"); 
        props.put("auto.offset.reset", "earliest"); 
     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer(props); 
        consumer.subscribe(Arrays.asList("consumertest")); 
        while (true) { 
         ConsumerRecords<String, String> records = consumer.poll(100); 
         for (ConsumerRecord<String, String> record : records){ 
          System.out.println("Data recieved : "+record.value()); 
          } 
        } 

在這裏,我要永遠跑消費者,這樣的紀錄推到卡夫卡的話題生產者應立即使用並處理。
所以我的困惑是,它是一個正確的方式來使用無限的while循環(如在示例代碼中)來消費數據?

回答

0

它適用於我,但您可能想要將您的內部循環放入try/catch塊以防萬一您拋出任何異常。如果斷開連接,請考慮定期重新連接任務。

0

是的,你可以使用無限循環。其實,這不是一個繁忙的循環。在每次輪詢期間,如果數據不可用,呼叫會等待給定的時間。

long millisToWait = 100; 
consumer.poll(millisToWait); 

新客戶自動處理網絡通信問題。確保在關機時消費者優雅地關閉。