要求:期望接收/處理inMessageHandler中的inMessage和emailMessageHandler中的emailMessage。SI kafka - 基於路由器條件的消息流
問題:(如果消費者組ID對於消費者配置是不同的)消息一直流到ConsumerServiceHandler的變換方法,但在emailMessageHandler中沒有得到它(但是在inMessage中爲inMessageHandler)。
問題:(如果消費者組ID與消費者配置相同)消息正從ConsumerServiceHandler的變換方法流向emailMessageHandler中的emailMessage,但根本沒有在inMessageHandler中接收到inMessage(即使在ConsumerServiceHandler的變換方法中也沒有) 。
能否請您提出這裏有什麼問題。如何根據主題ID在不同的服務類別上接收不同的消息進行處理?
請找到下面的配置。
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
key-class-type="java.lang.String"
value-class-type="com.test.EmailMessageVo"
topic="emailMessag_topic"
value-encoder="emailvalueEncoder"
key-encoder="kafkaSerializer"
compression-type="none"/>
<int-kafka:producer-configuration broker-list="localhost:9092"
key-class-type="java.lang.String"
value-class-type="com.test.InMessageVo"
topic="inMessage_topic"
value-encoder="invalueEncoder"
key-encoder="kafkaSerializer"
compression-type="none"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<bean id="invalueEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="com.test.InMessageVo" />
<bean id="emailvalueEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="com.test.EmailMessageVo" />
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="false"
channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>
<int:channel id="inputFromKafka">
<int:queue />
</int:channel>
<int:channel id="receiveMessageFromKafka">
<int:queue />
</int:channel>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="1000"
zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default1"
value-decoder="emailvalueDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="5000">
<int-kafka:topic id="test1" streams="4"/>
</int-kafka:consumer-configuration>
<int-kafka:consumer-configuration group-id="default2"
value-decoder="invalueDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="50">
<int-kafka:topic id="test2" streams="4"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<bean id="emailvalueDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaDecoder">
<constructor-arg value="com.test.EmailMessageVo" />
</bean>
<bean id="invalueDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaDecoder">
<constructor-arg value="com.test.InMessageVo" />
</bean>
<bean id="consumerService" class="com.test.ConsumerServiceHandler" />
<int:service-activator id="advicedSa" input-channel="inputFromKafka" ref="consumerService"method="testConsumerCircuitBreaker" outputchannel="receiveMessageFromKafka">
<int:channel id="inMessage_topic_channel">
<int:queue />
<int:channel id="emailMessage_topic_channel">
<int:queue />
<bean id="inMessageService" class="com.test.inMessageHandler" />
<bean id="emailMessageService" class="com.test.emailMessageHandler" />
<int:service-activator id="advicedSa1" input-channel="inMessage_topic_channel" ref="inMessageService"method="execute">
<int:service-activator id="advicedSa2" input-channel="emailMessage_topic_channel" ref="emailMessageService"method="execute" >
<int:router input-channel="receiveMessageFromKafka" expression="headers.topic">
<int:mapping value="inMessage_topic" channel="inMessage_topic_channel"/>
<int:mapping value="emailMessage_topic" channel="emailMessage_topic_channel"/>
感謝@ArtemBilan的善意響應 請在這裏找到consumerServiceHandler:HTTPS: //gist.github.com/anonymous/d8cf04dcabd8e4463d33 ConsumerServiceHandler:刪除主題/分區冗餘細節/設置主題。 消息發送:Message.withPayload(inMessageVO)/Message.withPayload(emailMessageVO) 並期待相同的,但在這種格式獲得:消息<地圖<字符串,地圖<整數,列表