我正在使用以下配置來將activemq與kafka集成。我從activemq收到消息並將其轉發給kafka。但是,我注意到消息正從JMS隊列中出列,但消息不會發送到卡夫卡。Spring集成 - Apache ActiveMQ到Kafka
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<jms:message-driven-channel-adapter
id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory"
channel="helloChannel" extract-payload="true" />
<integration:channel id="helloChannel" />
<integration:service-activator id="sayHelloServiceActivator"
input-channel="helloChannel" ref="sayHelloService" method="sayHello" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-template="template"
auto-startup="false" sync="true" channel="helloChannel" topic="test1234"
>
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<!--entry key="retries" value="5" /> <entry key="batch.size" value="16384"
/> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432"
/> < entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"
/> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"
/-->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
</beans>
此外,如果有任何來自Kafka的問題,它甚至不報告任何異常堆棧跟蹤。
我錯過了什麼嗎?
我不太瞭解Spring,但在Kafka中,您需要配置acks = all以便在經紀商存儲您的消息時獲得確認。如果acks = 0,您將不會確認成功或失敗提交消息。 –