我想從服務器的主題開始獲取所有消息。如何從kafka服務器獲取主題中的所有消息
例:
倉/ kafka-console-consumer.sh --zookeeper本地主機:2181 --topic testTopic --from-開始
當使用上述控制檯命令,我希望能夠從一開始就獲得主題中的所有消息,但是從開始使用Java代碼後,我無法使用主題中的所有消息。
我想從服務器的主題開始獲取所有消息。如何從kafka服務器獲取主題中的所有消息
例:
倉/ kafka-console-consumer.sh --zookeeper本地主機:2181 --topic testTopic --from-開始
當使用上述控制檯命令,我希望能夠從一開始就獲得主題中的所有消息,但是從開始使用Java代碼後,我無法使用主題中的所有消息。
最簡單的方法是啓動消費者並排除所有消息。現在我不知道你有多少分區在你的主題,你是否已經擁有了一個現有的消費羣或沒有,但你有幾種選擇:
看一看這個API:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
1)如果您的消費者已經在同一個消費羣中,並且仍想從頭開始消費,則應使用API文檔中列出的seek
選項,並將該組中的每位消費者的偏移設置爲0。這將從一開始就開始消耗。
2)否則,你可以在一個新的消費羣&中啓動一些消費者,你不必擔心尋求。 PS:如果您對卡夫卡有更多問題,請記住在將來提供有關您的設置的更多詳細信息。很多事情取決於你如何配置你的基礎設施&你如何更喜歡它,因此會因案例而異。
對於一個新的消費羣體,您還需要設置'auto.offset.reset = earliest' –
Btw:只是提出了這個SO文檔:) https://stackoverflow.com/documentation/proposed/changes/81323 –
此外,如果您可以在文檔答案中明確指定Kafka版本,那將是一件好事。這是因爲在處理消費者偏差方面存在很大差異。此前它是動物園管理員(0.9之前)的工作,但現在由kafka主題「__consumer_offset」處理。我發現很多人因爲版本之間的變化而感到困惑。 –
TopicPartition topicPartition = new TopicPartition(topic,0);列表分區= Arrays.asList(topicPartition); consumer.assign(分區); consumer.seekToBeginning(分區);
我爲SO文檔提出了這個建議:) https://stackoverflow.com/documentation/proposed/changes/81323 –