2015-09-20 104 views
1

要求:期望接收/處理inMessageHandler中的inMessage和emailMessageHandler中的emailMessage。SI kafka - 基於路由器條件的消息流

問題:(如果消費者組ID對於消費者配置是不同的)消息一直流到ConsumerServiceHandler的變換方法,但在emailMessageHandler中沒有得到它(但是在inMessage中爲inMessageHandler)。

問題:(如果消費者組ID與消費者配置相同)消息正從ConsumerServiceHandler的變換方法流向emailMessageHandler中的emailMessage,但根本沒有在inMessageHandler中接收到inMessage(即使在ConsumerServiceHandler的變換方法中也沒有) 。

能否請您提出這裏有什麼問題。如何根據主題ID在不同的服務類別上接收不同的消息進行處理?

請找到下面的配置。

<int-kafka:producer-context id="kafkaProducerContext"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration broker-list="localhost:9092" 
        key-class-type="java.lang.String" 
        value-class-type="com.test.EmailMessageVo" 
        topic="emailMessag_topic" 
        value-encoder="emailvalueEncoder" 
        key-encoder="kafkaSerializer" 
        compression-type="none"/> 
     <int-kafka:producer-configuration broker-list="localhost:9092" 
      key-class-type="java.lang.String" 
       value-class-type="com.test.InMessageVo" 
        topic="inMessage_topic" 
      value-encoder="invalueEncoder" 
        key-encoder="kafkaSerializer" 
        compression-type="none"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 



<bean id="invalueEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> 
    <constructor-arg value="com.test.InMessageVo" /> 

<bean id="emailvalueEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> 
    <constructor-arg value="com.test.EmailMessageVo" /> 

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" 
     kafka-consumer-context-ref="consumerContext" 
     auto-startup="false" 
     channel="inputFromKafka"> 
    <int:poller fixed-delay="1" time-unit="MILLISECONDS"/> 
</int-kafka:inbound-channel-adapter> 



<int:channel id="inputFromKafka"> 
    <int:queue /> 
    </int:channel> 


<int:channel id="receiveMessageFromKafka"> 
    <int:queue /> 
    </int:channel> 



    <int-kafka:consumer-context id="consumerContext" 
     consumer-timeout="1000" 
     zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> 
    <int-kafka:consumer-configurations> 
     <int-kafka:consumer-configuration group-id="default1" 
       value-decoder="emailvalueDecoder" 
       key-decoder="kafkaReflectionDecoder" 
       max-messages="5000"> 
      <int-kafka:topic id="test1" streams="4"/> 
     </int-kafka:consumer-configuration> 

     <int-kafka:consumer-configuration group-id="default2" 
       value-decoder="invalueDecoder" 
       key-decoder="kafkaReflectionDecoder" 
       max-messages="50"> 
     <int-kafka:topic id="test2" streams="4"/> 
     </int-kafka:consumer-configuration> 
    </int-kafka:consumer-configurations> 
</int-kafka:consumer-context> 


    <bean id="emailvalueDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaDecoder"> 
    <constructor-arg value="com.test.EmailMessageVo" /> 
</bean> 


    <bean id="invalueDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaDecoder"> 
    <constructor-arg value="com.test.InMessageVo" /> 
</bean> 
    <bean id="consumerService" class="com.test.ConsumerServiceHandler" /> 

    <int:service-activator id="advicedSa" input-channel="inputFromKafka" ref="consumerService"method="testConsumerCircuitBreaker" outputchannel="receiveMessageFromKafka"> 

    <int:channel id="inMessage_topic_channel"> 
    <int:queue /> 

<int:channel id="emailMessage_topic_channel"> 
    <int:queue /> 

<bean id="inMessageService" class="com.test.inMessageHandler" /> 

    <bean id="emailMessageService" class="com.test.emailMessageHandler" /> 

     <int:service-activator id="advicedSa1" input-channel="inMessage_topic_channel" ref="inMessageService"method="execute"> 

     <int:service-activator id="advicedSa2" input-channel="emailMessage_topic_channel" ref="emailMessageService"method="execute" > 

     <int:router input-channel="receiveMessageFromKafka" expression="headers.topic"> 
     <int:mapping value="inMessage_topic" channel="inMessage_topic_channel"/> 
     <int:mapping value="emailMessage_topic" channel="emailMessage_topic_channel"/> 

回答

0

您應該與我們分享您的ConsumerServiceHandler做什麼。僅僅因爲你的邏輯有點奇怪,從<int-kafka:inbound-channel-adapter>作爲Message<Map<String, Map<Integer, List<Object>>>>的結果,它是一個Message<?> with the有效載荷,作爲的分區部分的主題和他們的消息的地圖。

以同樣的group-id我們(你呢?)與最後的配置贏得覆蓋前面的問題結束了:

consumerConfigurationsMap.put(consumerConfiguration.getAttribute("group-id"), 
       consumerConfigurationBeanDefinition); 
+0

感謝@ArtemBilan的善意​​響應 請在這裏找到consumerServiceHandler:HTTPS: //gist.github.com/anonymous/d8cf04dcabd8e4463d33 ConsumerServiceHandler:刪除主題/分區冗餘細節/設置主題。 消息發送:Message.withPayload(inMessageVO)/Message.withPayload(emailMessageVO) 並期待相同的,但在這種格式獲得:消息<地圖<字符串,地圖<整數,列表 >>> 整體意圖:將不同的消息(inMessageVO/emailMessageVO)路由到差異處理程序進行處理 請建議,如果可以以更好的方式完成。 – sam