0
我正在嘗試編寫一個簡單的java kafka消費者以使用與https://github.com/bkimminich/apache-kafka-book-examples/blob/master/src/test/kafka/consumer/SimpleHLConsumer.java中類似的代碼讀取數據。kafka java消費者未讀取數據
看起來像我的應用程序能夠連接,但它沒有獲取任何數據。請建議。
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
//import scala.util.parsing.json.JSONObject
import scala.util.parsing.json.JSONObject;
public class SimpleHLConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
// props.put("zookeeper.session.timeout.ms", "5000");
// props.put("zookeeper.sync.time.ms", "250");
// props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
System.out.println(consumerStreams);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
System.out.println(streams);
System.out.println(consumer);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("for loop");
System.out.println(it);
System.out.println("Message from Single Topic: " + new String(it.next().message()));
//System.out.println("Message from Single Topic: " + new String(it.message()));
while (it.hasNext()) {
System.out.println("in While");
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
// if (consumer != null) {
// consumer.shutdown();
// }
}
public static void main(String[] args) {
String topic = "test";
SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}
這裏是我在eclipse中看到的輸出。它似乎連接到我的動物園管理員,但它只是掛在那裏,根本不顯示任何消息。
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
SLF4J: The requested version 1.6 by your slf4j binding is not compatible with [1.5.5, 1.5.6]
SLF4J: See http://www.slf4j.org/codes.html#version_mismatch for further details.
{test=[testgroup kafka stream]}
[testgroup kafka stream]
[email protected]
for loop
在啓動使用者程序後是否創建了任何新消息?如果沒有,嘗試添加'props.put(「auto.offset.reset」,「smallest」);'創建'ConsumerConfig'實例並重新運行該程序以查看是否可以看到所消費的消息。 – amethystic