2013-04-12 71 views
2

我是使用JMS/ActiveMQ的新手,我有一個Spring/Hibernate應用程序從ActiveMQ中的隊列中獲取消息並處理這些消息以實現持久性。由於這些消息需要一段時間來處理和保存,因此我將DefaultMessageListenerContainer配置爲擁有多個消費者(例如5-10),以便可以同時處理多個消息。我查看了很多ActiveMQ和Spring API文檔,我認爲我需要做的就是將maxConcurrentConsumers設置爲10 +將concurrentConsumers設置爲5,或者將DefaultMessageListenerContainer的併發性設置爲5-10。一旦我這樣做了,我可以從ActiveMQ的內置控制檯看到我的隊列確實有5個用戶。但是,當我在隊列中放置10條或100條消息時,處理似乎是單線程的,我添加了一條日誌行來打印線程ID,並且它似乎是按順序處理所有請求的相同線程ID。從控制檯上的ActiveMQ隊列頁面,我點擊Browse Active Consumer鏈接查看正在發生的事情,看起來像一個消費者有100個消息待處理,而另一個消息沒有其他消息4.Spring JMS ActiveMQ - 無法讓多個消費者同時處理

我做了一些研究,發現這篇文章來自Spring(http://forum.springsource.org/showthread.php?61170-Messages-missed-using-DefaultMessageListenerContainer),並添加了值爲2的預取策略,認爲每個消費者正在爲1000條消息簽名。現在,當我發送另一批消息時,一位消費者將有2-3條待處理消息,但其他4位消費者保持空閒狀態,並且最終由一位消費者依次處理所有消息。在這一點上,我想也許這是我在ActiveMQ代理上錯誤配置的東西。我在文檔中看到,默認調度策略是循環策略,但我在activemq.xml中看到名爲constantPendingMessageLimitStrategy的設置設置爲1000,並嘗試將其設置爲非常小的數字(例如2),認爲它控制了多少消息經紀人一次發送給消費者,但仍然沒有做任何事情。希望有人可以指出我做錯了什麼,我已經在下面發佈了我的spring配置文件,除了嘗試使用一個設置(constantPendingMessageLimitStrategy)之外,我還沒有觸及過activemq.xml。我正在使用ActiveMQ 5.8。

<bean id="importRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 
    <property name="initialRedeliveryDelay" value="15000" /> 
    <property name="maximumRedeliveries" value="-1" /> 
    <property name="useExponentialBackOff" value="true" /> 
    <property name="backOffMultiplier" value="2" /> 
</bean> 

<bean id="importPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"> 
    <property name="all" value="2"></property> 
</bean> 

<bean id="importConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL" value="${import.queue.url}"/> 
    <property name="redeliveryPolicy" ref="importRedeliveryPolicy" /> 
    <property name="prefetchPolicy" ref="importPrefetchPolicy"></property> 
</bean> 

<bean id="importQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
    <constructor-arg value="${import.queue.name}" /> 
</bean> 

<bean id="importListener" class="com.mycompany.ImportQueueListener" > 
    <property name="importService" ref="importService"></property> 
    <property name="sessionFactory" ref="sessionFactory"/> 
</bean> 

<bean id="importJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
    <property name="connectionFactory" ref="importConnectionFactory" /> 
    <property name="destination" ref="importQueue" /> 
    <property name="messageListener" ref="importListener" /> 
    <property name="sessionTransacted" value="true" /> 
    <property name="maxConcurrentConsumers" value="10"></property> 
    <property name="concurrentConsumers" value="5"></property> 
</bean> 

回答

0

你應該嘗試更換您的ActiveMQConnectionFactory帶游泳池:

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy method="stop"> 
<property name="connectionFactory"> 
    <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL"> 
     <value>tcp://localhost:61616</value> 
     </property> 
    </bean> 
    </property> 
</bean> 
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory"> 
    <ref local="jmsFactory"/> 
    </property> 
</bean> 
0
<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:p="http://www.springframework.org/schema/p" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" 
xmlns:tx="http://www.springframework.org/schema/tx" 
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd 
http://www.springframework.org/schema/tx 
http://www.springframework.org/schema/tx/spring-tx.xsd 
http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd 
http://www.springframework.org/schema/jms 
http://www.springframework.org/schema/jms/spring-jms.xsd"> 




<!-- <bean class="org.apache.activemq.command.ActiveMQQueue" id="destination"> 
    <constructor-arg value="TEST.Q1"></constructor-arg> 
    </bean>--> 

    <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> 
    <constructor-arg value="TOPIC_NAME" /> 
    </bean> 



<bean class="org.springframework.jms.core.JmsTemplate" id="producerTemplate"> 
    <property name="connectionFactory" ref="connectionFactory"/> 
    <property name="defaultDestination" ref="destination"/> 
</bean> 

<!--ActiveMq broker URL configured here--> 
    <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="connectionFactory" > 
     <property name="brokerURL"> 
      <value>tcp://localhost:61616</value> 
     </property> 
    </bean> 

    <!--producer configured here--> 

    <bean class="Producer" id="simpleMessageProducer"> 
     <property name="jmsTemplate" ref="producerTemplate"></property> 
    </bean> 


    <!--listeners configured here--> 

    <bean class="Consumer" id="simpleMessageListener"> 

     </bean> 
    <bean class="ConsumerSecond" id="simpleMessageListenerSecond"> </bean> 



    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer"> 
    <property name="connectionFactory" ref="connectionFactory"></property> 
    <property name="destination" ref="destination"></property> 
    <property name="messageListener" ref="simpleMessageListener"></property> 


    </bean> 
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer1"> 
    <property name="connectionFactory" ref="connectionFactory"></property> 
    <property name="destination" ref="destination"></property> 
    <property name="messageListener" ref="simpleMessageListenerSecond"></property> 

    </bean> 





</beans> 
+0

與配置多個監聽器上面的代碼中使用 – Hari

+0

歡迎[這麼]!這些類型的東西應該放在答案本身,而不是評論 - 他們只是爲了澄清一個職位。謝謝! –