2017-08-21 230 views
0

我正在尋找在我的應用程序中的kafka主題上設置多個監聽器。以下是我的設置。它應該被兩個組消費,但它只被一個聽衆消費。我在這裏錯過了什麼?多位消費者使用spring kafka

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<String, Object>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); 

    return props; 
} 

@Bean 
public ConsumerFactory<String, String> consumerFactory() { 
    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs()); 
    return consumerFactory; 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 
    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@Bean("notificationFactory") 
public ConcurrentKafkaListenerContainerFactory<String, String> notificationFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 
    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@Bean("insertContainerFactory") 
public ConcurrentKafkaListenerContainerFactory<String, String> insertContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 

    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", group = "insert_listener", containerFactory = "insertContainerFactory") 
public void receiveForInsert(String message) { 
    locationProcessor.insertLocationData(message); 
} 

@KafkaListener(id = "notification_listener", topics = "${kafka.topic.readlocation}", group = "notification_listener",containerFactory="notificationFactory") 
public void receiveForNotification(String message) { 
    locationProcessor.processNotificationMessage(message); 
} 

編輯:下面是工作

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", groupId = "insert_listener") 
public void receiveForInsert(String message) { 
    locationProcessor.insertLocationData(message); 
} 

回答

0

你需要爲每個不同的group.id代碼; group屬性不是group.id - 請參閱javadocs。在即將發佈的1.3版本中,有一個新的groupId屬性,我們也可以使用id作爲一個組(如果存在)。

對於較早的版本,您需要針對每個消費者工廠。

+0

完美的工作。謝謝。我升級到1.3版本並使用group.id。 –

相關問題