2015-02-11 28 views
9

我有一個簡單的卡夫卡消費者在Java中使用下面的代碼卡夫卡消費掛着.hasNext在java中

public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()&& !done){ 
      try { 
       System.out.println("Parsing data"); 
       byte[] data = it.next().message(); 
       System.out.println("Found data: "+data); 
       values.add(data); // array list 
      } catch (InvalidProtocolBufferException e) { 
       e.printStackTrace(); 
      } 
     } 
     done = true; 
    } 

當發佈消息,數據被成功讀取,但是當它返回檢查它.hasNext(),它保持掛起狀態,永不再回來。

有什麼可以拖延這?是KafkaStream獲得

m_stream如下:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put(topic, new Integer(a_numThreads)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
executor = Executors.newFixedThreadPool(a_numThreads); 
for (final KafkaStream stream : streams) { 
    // m_stream is one of these streams 
} 

回答

11

解決的辦法是添加屬性

「consumer.timeout.ms」

現在,當達到超時一個ConsumerTimeoutException是拋出