2017-09-19 97 views
1

我有其中userid將被刪除主題名稱「用戶」相同的消息,我想從這個主題和流程如下功能春季Kafka Listner |閱讀

1. process user leave data 
2. process user salary data 

我想有兩個監聽器指向同一主題和閱讀閱讀相同的用戶ID並且開始並行處理。

@KafkaListener(topics = "${kafka.topic.user}",group="abc")) 
      public void receive(String message) { 

        userService.processLeave(message); 
       } 

@KafkaListener(topics = "${kafka.topic.user}",group="abc1")) 
      public void receive1(String message) { 

        userService.processPayRoll(message); 
       } 

,但所有的時間我看到 - processPayRoll是越來越調用所有的時間。

我失蹤了什麼?

回答

1

那麼,看起來像你使用舊的春季卡夫卡版本。

不幸的是,group與消費者的group.id無關。 這是用於生命週期管理的containerGroup

您應該考慮根據不同的消費配置配置不同的KafkaMessageListenerContainer。在那裏你已經會配置不同的ConsumerConfig.GROUP_ID_CONFIG

最新版本具有@KafkaListener結構,如:

/** 
* If provided, the listener container for this listener will be added to a bean 
* with this value as its name, of type {@code Collection<MessageListenerContainer>}. 
* This allows, for example, iteration over the collection to start/stop a subset 
* of containers. 
* @return the bean name for the group. 
*/ 
String containerGroup() default ""; 

/** 
* Override the {@code group.id} property for the consumer factory with this value 
* for this listener only. 
* @return the group id. 
* @since 1.3 
*/ 
String groupId() default ""; 

/** 
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if 
* provided) as the {@code group.id} property for the consumer. Set to false, to use 
* the {@code group.id} from the consumer factory. 
* @return false to disable. 
* @since 1.3 
*/ 
boolean idIsGroup() default true; 
+0

的1.3.0.RC1候選發佈版是在春天的里程碑式回購現已;該發佈預計將在本月底結束。 –