2012-03-27 85 views
2

正確配置我們擁有一個ActiveMQ/Camel配置,此配置以前一直使用消息隊列和併發消費者。但是,我們現在正在介紹消息主題,並且發現由於併發消費者 - 主題中收到的消息被消耗了很多次。ActiveMQ:使用兩個隊列(併發使用者)和主題

此場景的正確配置是什麼?即,我們希望多個併發消費者用於在隊列上接收到的消息,但是隻有單個消費者爲在主題上接收到的消息而定義消費者。

這裏的當前配置:

<amq:connectionFactory id="amqConnectionFactory" 
    useAsyncSend="true" brokerURL="${${ptl.Servername}.jms.cluster.uri}" 
    userName="${jms.username}" password="${jms.password}" sendTimeout="1000" 
    optimizeAcknowledge="true" disableTimeStampsByDefault="true"> 
</amq:connectionFactory> 

<bean id="cachingConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 
    <property name="cacheConsumers" value="true"></property> 
    <property name="cacheProducers" value="true"></property> 
    <property name="reconnectOnException" value="true"></property> 
    <property name="sessionCacheSize" value="${jms.sessioncachesize}"></property> 
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" /> 
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

回答

3

你可以明確的設置concurrentConsumers/maxConcurrentConsumers到任何主題消費者的「1」。

from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")... 

可替代地,設置JmsConfiguration併發/ maxConcurrentConsumers屬性爲「1」,然後明確地根據需要啓用隊列併發消耗。

from("activemq:queue:myQueue?maxConcurrentConsumers=5")... 

還,您可以使用Virtual Topics來沒有得到重複進行主題郵件的併發量(強烈建議在傳統主題)

0

我結束了使用的解決方案是創建一個單獨的jmsConfig/ActiveMQ的配置塊。

總CONFIGRATION如下所示:

<!-- This is appropriate for consuming Queues, but not topics. For topics, use 
jmsTopicConfig/activemqTopics --> 
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" /> 
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

<!-- This config limits to a single concurrent consumer. This config is appropriate for 
consuming Topics, not Queues. --> 
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="1" /> 
    <property name="maxConcurrentConsumers" value="1" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsTopicConfig" /> 
</bean> 

然後,在駱駝管道,消耗掉activemqTopics豆的話題,如下:

<camel:route id="myTopicResponder"> 
    <camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" /> 
    <camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/> 
</camel:route> 
相關問題