0

我正在使用以下配置來將activemq與kafka集成。我從activemq收到消息並將其轉發給kafka。但是,我注意到消息正從JMS隊列中出列,但消息不會發送到卡夫卡。Spring集成 - Apache ActiveMQ到Kafka

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms" 
    xmlns:integration="http://www.springframework.org/schema/integration" 
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
    http://www.springframework.org/schema/integration/kafka 
    http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"> 

    <jms:message-driven-channel-adapter 
     id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory" 
     channel="helloChannel" extract-payload="true" /> 

    <integration:channel id="helloChannel" /> 

    <integration:service-activator id="sayHelloServiceActivator" 
     input-channel="helloChannel" ref="sayHelloService" method="sayHello" /> 

    <int-kafka:outbound-channel-adapter 
     id="kafkaOutboundChannelAdapter" kafka-template="template" 
     auto-startup="false" sync="true" channel="helloChannel" topic="test1234" 
     > 
    </int-kafka:outbound-channel-adapter> 

    <bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> 
     <constructor-arg> 
      <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
       <constructor-arg> 
        <map> 
         <entry key="bootstrap.servers" value="localhost:9092" /> 
         <!--entry key="retries" value="5" /> <entry key="batch.size" value="16384" 
          /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" 
          /> < entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" 
          /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" 
          /--> 
        </map> 
       </constructor-arg> 
      </bean> 
     </constructor-arg> 
    </bean> 



</beans> 

此外,如果有任何來自Kafka的問題,它甚至不報告任何異常堆棧跟蹤。

我錯過了什麼嗎?

+0

我不太瞭解Spring,但在Kafka中,您需要配置acks = all以便在經紀商存儲您的消息時獲得確認。如果acks = 0,您將不會確認成功或失敗提交消息。 –

回答

0

您的消息由sayHelloServiceActivator消耗。

所以你helloChannel渠道類型更改爲

<publish-subscribe-channel id="helloChannel"/> 

默認爲DirectChannel

的DirectChannel具有點至點語義但除此之外更 類似PublishSubscribeChannel比任何隊列基於上述的信道實現。它實現了SubscribableChannel接口 而不是PollableChannel接口,因此它將消息直接發送給訂閱者。但是,作爲 點對點信道,它不同於 PublishSubscribeChannel,因爲它只會將每個消息發送到 單訂閱MessageHandler。

+0

感謝您的評論。我注意到我的serviceActivator是無效類型,因此,它不會前進。我現在可以將消息從jms發送到kafka。感謝您的投入。 –

0

正如@哈森Bennour說,如果你想發送消息給兩個消費者,你需要一個發佈/訂閱頻道。

也就是說,你在kafka適配器上有auto-startup="false",所以它甚至不會訂閱該頻道。

如果它已啓動,則使用您當前的配置消息將循環發送到服務激活器和適配器。

+0

感謝您的評論。我注意到我的serviceActivator是無效類型,因此,它不會前進。我現在可以將消息從jms發送到kafka。感謝您的投入。 –