我是使用JMS/ActiveMQ的新手,我有一個Spring/Hibernate應用程序從ActiveMQ中的隊列中獲取消息並處理這些消息以實現持久性。由於這些消息需要一段時間來處理和保存,因此我將DefaultMessageListenerContainer配置爲擁有多個消費者(例如5-10),以便可以同時處理多個消息。我查看了很多ActiveMQ和Spring API文檔,我認爲我需要做的就是將maxConcurrentConsumers設置爲10 +將concurrentConsumers設置爲5,或者將DefaultMessageListenerContainer的併發性設置爲5-10。一旦我這樣做了,我可以從ActiveMQ的內置控制檯看到我的隊列確實有5個用戶。但是,當我在隊列中放置10條或100條消息時,處理似乎是單線程的,我添加了一條日誌行來打印線程ID,並且它似乎是按順序處理所有請求的相同線程ID。從控制檯上的ActiveMQ隊列頁面,我點擊Browse Active Consumer鏈接查看正在發生的事情,看起來像一個消費者有100個消息待處理,而另一個消息沒有其他消息4.Spring JMS ActiveMQ - 無法讓多個消費者同時處理
我做了一些研究,發現這篇文章來自Spring(http://forum.springsource.org/showthread.php?61170-Messages-missed-using-DefaultMessageListenerContainer),並添加了值爲2的預取策略,認爲每個消費者正在爲1000條消息簽名。現在,當我發送另一批消息時,一位消費者將有2-3條待處理消息,但其他4位消費者保持空閒狀態,並且最終由一位消費者依次處理所有消息。在這一點上,我想也許這是我在ActiveMQ代理上錯誤配置的東西。我在文檔中看到,默認調度策略是循環策略,但我在activemq.xml中看到名爲constantPendingMessageLimitStrategy的設置設置爲1000,並嘗試將其設置爲非常小的數字(例如2),認爲它控制了多少消息經紀人一次發送給消費者,但仍然沒有做任何事情。希望有人可以指出我做錯了什麼,我已經在下面發佈了我的spring配置文件,除了嘗試使用一個設置(constantPendingMessageLimitStrategy)之外,我還沒有觸及過activemq.xml。我正在使用ActiveMQ 5.8。
<bean id="importRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="15000" />
<property name="maximumRedeliveries" value="-1" />
<property name="useExponentialBackOff" value="true" />
<property name="backOffMultiplier" value="2" />
</bean>
<bean id="importPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="2"></property>
</bean>
<bean id="importConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${import.queue.url}"/>
<property name="redeliveryPolicy" ref="importRedeliveryPolicy" />
<property name="prefetchPolicy" ref="importPrefetchPolicy"></property>
</bean>
<bean id="importQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${import.queue.name}" />
</bean>
<bean id="importListener" class="com.mycompany.ImportQueueListener" >
<property name="importService" ref="importService"></property>
<property name="sessionFactory" ref="sessionFactory"/>
</bean>
<bean id="importJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="importConnectionFactory" />
<property name="destination" ref="importQueue" />
<property name="messageListener" ref="importListener" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="10"></property>
<property name="concurrentConsumers" value="5"></property>
</bean>
與配置多個監聽器上面的代碼中使用 – Hari
歡迎[這麼]!這些類型的東西應該放在答案本身,而不是評論 - 他們只是爲了澄清一個職位。謝謝! –