2017-09-19 52 views

回答

3

@KafkaListener要求KafkaListenerContainerFactory@Bean,而這又是基於ConsumerFactory。而DefaultKafkaConsumerFactory接受消費者CONFIGS的Map<String, Object>

@Configuration 
@EnableKafka 
public class KafkaConfig { 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
         kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
           new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); 
     ... 
     return props; 
    } 
} 

https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#__kafkalistener_annotation

如果這一ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG是完全標準的Apache卡夫卡bootstrap.serversproperty

主機/端口對用於建立名單與Kafka集羣的初始連接。客戶端將使用所有服務器,而不管在此指定哪些服務器用於引導 - 此列表僅影響用於發現全套服務器的初始主機。此列表應採用host1:port1,host2:port2,...的形式。由於這些服務器僅用於初始連接以發現完整集羣成員資格(可能會動態更改),因此此列表不需包含完整集合的服務器(不過,如果服務器停機,您可能需要多個服務器)。

不,您不能指向Zookeeper地址。卡夫卡不再支持這一點。

+1

新的(0.9+)客戶端目標是避免客戶端代碼必須與動物園管理員交談。但是,您可以(我假設)自己查詢zookeeper以獲取代理列表並填充引導程序服務器配置屬性。 –

+0

謝謝阿爾喬姆。另外我相信Kafka有一個叫做_consumer-offset的話題,用它來處理服務器和客戶端之間的偏移量管理。所以如果我使用與多個zookeepers對應的多個服務器,Will spring-kafka會特別處理偏移量管理。 – daemon54

+1

春季卡夫卡不管理偏移量。一切都是在卡夫卡經紀人完成的。我們只提供'consumer.group'並依賴爲Consumer實例提供的偏移量。 –

相關問題