2014-02-13 56 views
6

我正在使用Kafka網站的ConsumerGroupExample代碼測試Kafka高級消費者。我想檢索我在「Kafka服務器配置」中關於「測試」主題的所有現有消息。綜觀其他博客,auto.offset.reset應設置爲「最小」,以能夠得到的所有消息:Kafka高級消費者使用Java API從主題獲取所有消息(等效於 - from-beginning)

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", a_zookeeper); 
    props.put("group.id", a_groupId); 
    props.put("auto.offset.reset", "smallest"); 
    props.put("zookeeper.session.timeout.ms", "10000");  

    return new ConsumerConfig(props); 
} 

這個問題我真的是這樣的:什麼是等價的Java API調用高層次的消費是等價的:

斌/ kafka-console-consumer.sh --zookeeper本地主機:2181 --topic測試--from-開始

回答

4

看起來你需要使用「低水平SimpleConsumer API「

對於大多數應用程序來說,高級用戶API足夠好。 某些應用程序想要尚未暴露給高級消費者 的功能(例如,在重新啓動消費者時設置初始偏移量)。他們可以用 代替使用我們的低級SimpleConsumer Api。該邏輯將會有點複雜,您可以按照here中的示例進行操作。

此示例工作了讓所有的消息從一個話題下列參數:(注意,該端口是卡夫卡的端口,而不是ZooKeeper的端口,從this example設立主題):

10 my-replicated-topic 0 localhost 9092 

具體而言,是獲得readOffset的方法,其需要kafka.api.OffsetRequest.EarliestTime():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); 

這裏是另一篇文章可能會提供一些關於如何排序的替代想法:How to get data from old offset point in Kafka?

+0

你用什麼來實現它?閱讀主題中的所有消息。 – Samra

6

基本上,每當新消費者嘗試使用某個主題時,它就會從頭開始讀取消息。如果您爲了測試的目的每次都特別從頭開始消費,那麼每次您使用新的groupID初始化消費者時,它都會從頭開始讀取消息。以下是我如何做到的:

properties.put("group.id", UUID.randomUUID().toString()); 

並且每次從頭讀取消息!

+0

謝謝!需要它用於測試目的。我想這是因爲你可以重複使用相同的數據用於不同的目的? – user1758777

+0

@ user1758777是的,我需要每個不同的測試來處理相同的數據。 –

2

要提取從一開始的消息,你可以這樣做:

import kafka.utils.ZkUtils; 
ZkUtils.maybeDeletePath("zkhost:zkport", "/consumers/group.id"); 

後按照日常工作......

0
Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("auto.offset.reset", "earliest"); 
props.put("group.id", UUID.randomUUID().toString()); 

此屬性將幫助你。

相關問題