2017-03-20 223 views
0

我正在嘗試編寫一個簡單的java kafka消費者以使用與https://github.com/bkimminich/apache-kafka-book-examples/blob/master/src/test/kafka/consumer/SimpleHLConsumer.java中類似的代碼讀取數據。kafka java消費者未讀取數據

看起來像我的應用程序能夠連接,但它沒有獲取任何數據。請建議。

import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
//import scala.util.parsing.json.JSONObject 
import scala.util.parsing.json.JSONObject; 

public class SimpleHLConsumer { 

    private final ConsumerConnector consumer; 
    private final String topic; 

    public SimpleHLConsumer(String zookeeper, String groupId, String topic) { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeper); 
     props.put("group.id", groupId); 
     // props.put("zookeeper.session.timeout.ms", "5000"); 
     // props.put("zookeeper.sync.time.ms", "250"); 
     // props.put("auto.commit.interval.ms", "1000"); 

     consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 
     this.topic = topic; 
    } 

    public void testConsumer() { 
     Map<String, Integer> topicCount = new HashMap<>(); 
     topicCount.put(topic, 1); 

     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); 
     System.out.println(consumerStreams); 
     List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 
     System.out.println(streams); 
     System.out.println(consumer); 
     for (final KafkaStream stream : streams) { 
      ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
      System.out.println("for loop"); 
      System.out.println(it); 
      System.out.println("Message from Single Topic: " + new String(it.next().message())); 
      //System.out.println("Message from Single Topic: " + new String(it.message())); 
      while (it.hasNext()) { 
       System.out.println("in While"); 
       System.out.println("Message from Single Topic: " + new String(it.next().message())); 
      } 
     } 
     // if (consumer != null) { 
     //  consumer.shutdown(); 
     // } 
    } 

    public static void main(String[] args) { 
     String topic = "test"; 
     SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic); 
     simpleHLConsumer.testConsumer(); 
    } 

} 

這裏是我在eclipse中看到的輸出。它似乎連接到我的動物園管理員,但它只是掛在那裏,根本不顯示任何消息。

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
SLF4J: The requested version 1.6 by your slf4j binding is not compatible with [1.5.5, 1.5.6] 
SLF4J: See http://www.slf4j.org/codes.html#version_mismatch for further details. 
{test=[testgroup kafka stream]} 
[testgroup kafka stream] 
[email protected] 
for loop 
+0

在啓動使用者程序後是否創建了任何新消息?如果沒有,嘗試添加'props.put(「auto.offset.reset」,「smallest」);'創建'ConsumerConfig'實例並重新運行該程序以查看是否可以看到所消費的消息。 – amethystic

回答

0

消費者迭代器hasNext正在阻止呼叫。如果沒有新的消息可用於消費,它將無限期地阻止。

爲了驗證這一點,更改您的代碼

// Comment 2 lines below 
// System.out.println(it); 
// System.out.println("Message from Single Topic: " + new String(it.next().message())); 
// Line below is blocking. Your code will hang till next message in topic. 
// Add new message in topic using producer, message will appear in console 
while (it.hasNext()) { 

更好的方法是在單獨的線程中執行代碼。使用consumer.timeout.ms指定以ms爲單位的時間,之後消費者將拋出超時異常

// keepRunningThread is flag to control when to exit consumer loop 
while(keepRunningThread) 
{ 
    try 
    { 
    if(it.hasNext()) 
    { 
     System.out.println(new String(it.next().message())); 
    } 
    } 
    catch(ConsumerTimeoutException ex) 
    { 
    // Timeout exception waiting for kafka message 
    // Wait for 5 (or t) seconds before checking for message again 
    Thread.sleep(5000); 
    } 
}‍‍‍‍‍‍‍‍‍‍