2017-04-11 40 views
0

我使用SpringXD,我有以下配置:SpringXD:自動啓動=虛假不工作的卡夫卡消息驅動通道適配器

  • 彈簧集成 - 卡夫卡2.1.0.RELEASE
  • 卡夫卡客戶0.10.0.1
  • 卡夫卡0.10.xx
  • 彈簧XD-1.3.1.RELEASE

我在我的xml文件以下配置:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 


    <int:channel id="input" /> 
    <int:channel id="output" /> 

    <int:control-bus input-channel="input" /> 

    <int-kafka:message-driven-channel-adapter 
     id="kafka-inbound-channel-adapter-testing" listener-container="container1" 
     auto-startup="false" phase="100" send-timeout="5000" 
     channel="output" mode="record" 
     message-converter="messageConverter" /> 

    <bean id="messageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter" /> 

    <!--Consumer --> 
    <bean id="container1" 
     class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
     <constructor-arg> 
      <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
       <constructor-arg> 
        <map> 
         <entry key="bootstrap.servers" value="localhost:9092" /> 
         <entry key="enable.auto.commit" value="false" /> 
         <entry key="auto.commit.interval.ms" value="100" /> 
         <entry key="session.timeout.ms" value="15000" /> 
         <entry key="max.poll.records" value="3" /> 
         <entry key="group.id" value="bridge-stream-testing" /> 
         <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" /> 
         <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> 
        </map> 
       </constructor-arg> 
      </bean> 
     </constructor-arg> 

     <constructor-arg> 
      <bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
       <constructor-arg name="topics" value="testing-topic" /> 
      </bean> 
     </constructor-arg> 
    </bean> 

</beans> 

這是我使用啓動Java類/停止頻道:

package com.kafka.source.logic; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.ImportResource; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.support.GenericMessage; 
import org.springframework.scheduling.annotation.EnableScheduling; 
import org.springframework.scheduling.annotation.Scheduled; 

@Configuration 
@EnableScheduling 
@ImportResource("classpath:/config/kafka-source-context.xml") 
public class KafkaSourceRetry { 

    @Autowired 
    MessageChannel input; 

    @Scheduled(cron="*/50 * * * * *") 
    void startAdapter(){ 
     //CODE COMMENTED OUT TO MAKE SURE THE ADAPTER IS NOT BEING STARTED 
     //EVEN IF I UNCOMMENT THE CODE, THE 50 secs defined related to the cron are not respected. 
     //That is, if I send a message to the topic, it is inmediately consumed 
     //input.send(new GenericMessage<String>("@kafka-inbound-channel-adapter-testing.start()")); 
    } 
} 

然後,我創建了一個基本流,以檢查是否存在,我傳送給該主題的消息通過

stream create --name bridgeStream --definition "kafkaSourceLatestApi_v2|bridge|file" --deploy 

來了我檢查時創建的文件,它包含了所有我發送到卡夫卡主題的消息:

hola_que_tal que_bonito bridgeStream.out(END)

另外,在我發現這個日誌:

2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 在相 開始豆0 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 啓動類型的豆 'container1'[類 有機。彈簧framework.kafka.listener.KafkaMessageListenerContainer] 2017-04-10T22:37:06-0300 1.3.1.RELEASE調試 DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 成功啓動Bean'container1'2017-04-10T22:37:06 -0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 啓動豆在階段100 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 啓動豆'類型 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter'的'kafka-inbound-channel-adapter-testing' 2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache- 0 inbound.KafkaMessageDrivenChannelAdapter - 開始 卡夫卡入站通道適配器 - 測試2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - 成功啓動豆 「kafka-入站通道適配器測試」

我的問題是:爲什麼通道會自動啓動?

回答

1

它的設計,所有模塊都將自動啓動設置爲false,因此它們不會不按順序啓動;當您部署流時,各個模塊將部署並從右至左啓動。

部署/取消部署是啓動/停止流的方式。

See the ModuleDeployer

+0

其實我不想部署/取消部署流。我想要做的就是手動停止/啓動頻道(所以這就是我使用'message-driven-channel-adapter'的原因,但是看起來我甚至沒有手動啓動它(使用'@ kafka-入站通道適配器testing.start()')它只是自己開始的 – Columb1a

+1

我不明白,如果你不部署模塊,你怎麼能停止/啓動它?它不是「開始自己的「Spring XD start()它部署後,你可以停止/開始使用JMX,但是沒有辦法阻止XD在部署過程中啓動它,如果你沒有部署到XD,爲什麼使用XD模塊?你可以直接使用spring-integration-kafka適配器,但是你的問題說你正在部署一個流'stream create -name bridgeStream --definition「kafkaSourceLatestApi_v2 | bridge | file」--deploy' - 所以你的評論沒有意義 –

+1

如果你的問題是「我可以使用標準的Spring Integration組件部署一個流而不啓動它」,答案是n O操作。您可以編寫一個忽略'start()'並使用其他一些機制來啓動的自定義適配器。 –

相關問題