2016-09-19 138 views
4

我有一個主題列表(現在它是10),其大小可以在未來增加。我知道我們可以產生多個線程(每個主題)來消費每個主題,但在我的情況下,如果主題數量增加,那麼消耗主題的線程數就會增加,這是我不想要的,因爲主題不是會頻繁獲取數據,因此這些線程將處於理想狀態。卡夫卡消費者爲多個主題

是否有任何方法可讓單個消費者從所有主題中消費?如果是的話,那我們該如何實現呢? Kafka也將如何維持抵消?請提出答案。

回答

4

我們可以通過下面的API爲多個主題訂閱: consumer.subscribe(Arrays.asList(TOPIC1,標題2),ConsumerRebalanceListener OBJ)

消費者對該主題的信息,我們可以使用COMIT或consumer.commitAsync消費者.commitSync()通過創建OffsetAndMetadata對象,如下所示。

ConsumerRecords<String, String> records = consumer.poll(long value); 
for (TopicPartition partition : records.partitions()) { 
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
     System.out.println(record.offset() + ": " + record.value()); 
    } 
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
} 
+0

我知道,我們可以,但卡夫卡如何保持膠印?另外,擁有一個消費者羣體能夠解決我的問題嗎? – Apollo

+1

偏移量由您的應用提交併存儲在一個名爲__consumer_offsets的特殊偏移量kafka主題中。爲每個主題的每個分區保留偏移量,因此與您訂閱的主題數量無關。 –

1

不需要多個線程,您可以讓一個消費者從多個主題中消費。 偏移量由zookeeper維護,因爲kafka-server本身是無狀態的。 每當消費者消費一條消息時,其偏移量就會與動物園管理員合作,以保留將來的消息只處理每條消息一次。因此,即使在kafka失敗的情況下,消費者也會從最後一次提交的抵消開始消費。

+1

從Kafka 0.9及以上版本開始,偏移量將存儲在Kafka主題中,而不是zookeeper –