2015-12-22 25 views
1

我在SpringXD 1.3.0上測試。將消息複製到不同的接收器。我的配置是由RabbitMQ支持的三節點羣集作爲消息總線。 我的測試是這樣的:增加主題支持隊列的併發性:通道

  1. 首例

    stream create sourceToDuplicate --definition "trigger --fixedDelay=1 
    --timeUnit=MILLISECONDS --payload='test' > topic:test" --deploy 
    stream create processMessages1 --definition "topic:test > cassandra --initScript=file:<absolut-path-to>/int-db.cql --ingestQuery='insert into book (isbn, title, author) values (uuid(), ?, ?)'" 
    stream create processMessages2 --definition "topic:test > aggregator --count=1000 --timeout=1000 | file" --deploy 
    

現在,爲了增加對卡桑德拉下沉的消費者,我想部署與「模塊第一流.cassandra.consumer.concurrency = 10" 。該屬性使部署失敗。

我現在的解決辦法是第四流,這樣我就可以增加消費者:

  • 第二種情況

    stream create topicToQueue1 --definition "topic:test > queue:test1" --deploy 
    stream create processMessage1 --definition "queue:test1 > cassandra..." 
    stream deploy processMessage1 --properties "module.cassandra.consumer.concurrency=10" 
    
  • 最後我的問題:爲什麼第一個用例會失敗,如果rabbitmq已經爲該主題添加了一個隊列:允許更多用戶允許的頻道?

    聖誕快樂給大家

    ---更新---

    版本:SpringXD 1.3.0.RELEASE

    錯誤:

    2015-12-18T13:58:28+0100 1.3.0.RELEASE INFO DeploymentSupervisor-0 
    zk.ZKStreamDeploymentHandler - Deployment status for stream 'processMessage1':  
    DeploymentStatus{state=failed,error(s)=java.lang.IllegalArgumentException:  
    RabbitMessageBus does not support consumer property: concurrency for processMessage1.topic:test. 
    at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateProperties(MessageBusSupport.java:786) 
        at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateConsumerProperties(MessageBusSupport.java:757) 
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus.bindPubSubConsumer(RabbitMessageBus.java:472) 
        at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageConsumer(AbstractMessageBusBinderPlugin.java:275) 
        at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:155) 
        at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73) 
        at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238) 
        at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218) 
        at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200) 
        at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365) 
        at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334) 
        at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181) 
        at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149) 
        at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:509) 
        at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:503) 
        at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92) 
        at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) 
        at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83) 
        at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500) 
        at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35) 
        at org.apache.curator.framework.recipes.cache.PathChildrenCache$10.run(PathChildrenCache.java:762) 
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
        at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
        at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
        at java.lang.Thread.run(Thread.java:745) 
    
    +0

    '>此屬性讓部署失敗。「您遇到什麼錯誤?什麼版本的XD? (編輯問題,不要嘗試在評論中添加堆棧跟蹤)。 –

    +0

    版本是1.3.0 ...我編輯問題 –

    回答

    0

    你不能有併發在名爲channel的topic:上> 1,否則每個線程都將獲得消息的副本。

    如果要在指定通道上使用併發性,它必須是queue:,這樣每個線程纔會競爭消息。

    +0

    '否則每個線程都會得到一個消息的副本。「謝謝!現在我明白爲什麼第一個案件失敗了。這意味着消息必須從主題:測試移動到幾個隊列? –

    +0

    考慮到RabbitMessageBus的架構,這個限制事實上可以被移除,但這需要改變XD(隨意打開一個改進[JIRA Issue](https://jira.spring.io/browse)/XD),你可以通過綁定第二個隊列到爲指定的'topic:'channel創建的扇出交換來解決這個問題,然後使用兔子源從cassandra流的隊列中消費 - 這裏沒有併發限制rabbit source。或者,調用附加隊列'queue:foo'並且你可以使用一個命名通道 - 你必須手動綁定隊列。 –