2013-07-31 169 views
5

我是卡夫卡新手,我對消費者的理解是基本上有兩種類型的實現。
1)The High level consumer/consumer group
2)Simple ConsumerApache Kafka消費羣體和簡單消費者

對高層次的抽象最重要的部分是,當卡夫卡不關心處理它使用的,而簡單的消費提供了偏移管理更好地控制所抵消。令我迷惑的是如果我想在多線程環境中運行消費者,並且想要控制偏移量。如果我使用消費者組,這是否意味着我必須從存儲在zookeeper中的最後一個偏移量中讀取?這是我擁有的唯一選擇。

回答

6

絕大多數情況下,高級消費者API不會讓您直接控制偏移量。

首次創建使用者組時,可以通過使用auto.offset.reset屬性來確定是否以kafka存儲的最早或最新消息開始。

您還可以通過將auto.commit.enable設置爲false來控制何時高級別消費者向動物園管理員提交新的偏移量。

由於高級消費者將偏移量存儲在zookeeper中,您的應用程序可以直接訪問zookeeper並操縱偏移量 - 但它將超出高級消費者API。

你的問題有點混亂,但你可以在多線程環境中使用簡單的使用者。這就是高級消費者所做的。

0

在Apache Kafka 0.9和0.10中,消費者組管理由Broker(用於協調)和主題(用於狀態存儲)完全在Kafka應用程序內處理。

當消費者組第一預訂主題的auto.offset.reset的設置確定,消費者開始消耗消息(http://kafka.apache.org/documentation.html#newconsumerconfigs

當特定消費者分配主題/分區可以將ConsumerRebalanceListener註冊以接收通知。

消費者運行後,可以使用seekseekToBeginningseekToEnd從特定偏移量中獲取消息。 seek會影響消費者下一poll,並保存在下一提交(如commitSynccommitAsync或當auto.commit.interval流逝,如果啓用。)

消費者的javadoc提及更具體的情況:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

您可以將卡夫卡提供的組管理與通過查詢(..)手動管理偏移進行組合,一旦分區分配完成。