我正在使用Kafka網站的ConsumerGroupExample代碼測試Kafka高級消費者。我想檢索我在「Kafka服務器配置」中關於「測試」主題的所有現有消息。綜觀其他博客,auto.offset.reset應設置爲「最小」,以能夠得到的所有消息:Kafka高級消費者使用Java API從主題獲取所有消息(等效於 - from-beginning)
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
這個問題我真的是這樣的:什麼是等價的Java API調用高層次的消費是等價的:
斌/ kafka-console-consumer.sh --zookeeper本地主機:2181 --topic測試--from-開始
你用什麼來實現它?閱讀主題中的所有消息。 – Samra