我有一個偵聽器需要從具有相同主題的多個kafka服務器讀取,這些服務器都在一個zookeeper下配置。我如何從這些多臺服務器上讀取數據。你能幫忙嗎?如何使用spring-kafka爲偵聽器傳遞多個引導程序服務器
取而代之的是卡夫卡服務器,我可以指向zookeeper嗎?
我有一個偵聽器需要從具有相同主題的多個kafka服務器讀取,這些服務器都在一個zookeeper下配置。我如何從這些多臺服務器上讀取數據。你能幫忙嗎?如何使用spring-kafka爲偵聽器傳遞多個引導程序服務器
取而代之的是卡夫卡服務器,我可以指向zookeeper嗎?
該@KafkaListener
要求KafkaListenerContainerFactory
@Bean
,而這又是基於ConsumerFactory
。而DefaultKafkaConsumerFactory
接受消費者CONFIGS的Map<String, Object>
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
...
return props;
}
}
如果這一ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
是完全標準的Apache卡夫卡bootstrap.servers
property:
主機/端口對用於建立名單與Kafka集羣的初始連接。客戶端將使用所有服務器,而不管在此指定哪些服務器用於引導 - 此列表僅影響用於發現全套服務器的初始主機。此列表應採用host1:port1,host2:port2,...的形式。由於這些服務器僅用於初始連接以發現完整集羣成員資格(可能會動態更改),因此此列表不需包含完整集合的服務器(不過,如果服務器停機,您可能需要多個服務器)。
不,您不能指向Zookeeper地址。卡夫卡不再支持這一點。
新的(0.9+)客戶端目標是避免客戶端代碼必須與動物園管理員交談。但是,您可以(我假設)自己查詢zookeeper以獲取代理列表並填充引導程序服務器配置屬性。 –
謝謝阿爾喬姆。另外我相信Kafka有一個叫做_consumer-offset的話題,用它來處理服務器和客戶端之間的偏移量管理。所以如果我使用與多個zookeepers對應的多個服務器,Will spring-kafka會特別處理偏移量管理。 – daemon54
春季卡夫卡不管理偏移量。一切都是在卡夫卡經紀人完成的。我們只提供'consumer.group'並依賴爲Consumer實例提供的偏移量。 –