2017-05-12 48 views
0

我使用0.9版本的卡夫卡來消費數據,但程序總是運行到consumer.poll(100)的行,然後它不會繼續運行。我使用了kafka-clients-0.9.0.0的jar。
卡夫卡:kafka_2.10-0.9.0.0
飼養員:飼養員-3.4.5kafka-0.9 cann't consumer data

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "10.28.176.11:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("tmp")); 
    try { 
     while (true) { 
      System.out.println("start comsuming..."); 
      ConsumerRecords<String, String> records = consumer.poll(100); 
      System.out.println("start print data......."); 
      for (ConsumerRecord<String, String> record : records) 
       System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     } 
    } finally { 
     consumer.close(); 
    } 
} 

日誌:

enter image description here

+0

因爲我使用這裏提到的方法(http://stackoverflow.com/questions/37770024/kafka-0-9-0-1-java-consumer-stuck-in-awaitmetadataupdate),設置變量ADVERTISED_PORT和ADVERTISED_HOST ,但它仍然卡在consumer.poll調用中。 –

回答

0

我已經想通了。我刪除了zookeeper中的所有kafka元數據,例如/ borkers/admin/comsumers/config/controller/controller_epoch/isr_change_notification,然後重新啓動了kafka,錯誤消失了。我想也許卡夫卡集羣有一些錯誤。