我是Java新手,使用博客https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example,嘗試開發嵌入式適配器以接收來自Kafka的流。 這是代碼的一部分,它假定消費者是單線程的。Kafka簡單的消費者 - 在迭代器上獲取錯誤
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(getNumThreads()));
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(1);
final KafkaStream<String, String> stream = streams.get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
// fill the tuple and output the tuple
fillAndOutputTuple();
}
我得到這個錯誤來自Eclipse IDE的it.hasNext(): 類文件迭代器包含一個簽名 '(I)Lscala /收集/ Iterator.GroupedIterator;'病態的57位
(奇怪的是,57位上不存在一樣,它提供了錯誤。)
真的很感謝所有幫助
'final KafkaStream stream = streams.get(0);'line?你使用過任何調試器嗎? –
user2720864
2014-09-11 05:44:53