我們試圖將ActiveMQ 5.9.0作爲使用JMS主題的消息代理進行設置,但我們在使用消息時遇到了一些問題。帶有JMS主題的ActiveMQ - 消費者未出隊的一些消息
出於測試目的,我們有1個主題,1個事件生產者和1個消費者的簡單配置。我們一個接一個地發送10條消息,但每次運行應用程序時,都會消耗1-3條消息!其他消息被消耗並且處理得很好。 我們可以看到,即使我們重新啓動應用程序(我們可以看到「Enqueue」和「Dequeue」中的數字),我們發佈到ActiveMQ管理控制檯中的主題的所有消息,但他們永遠不會到達消費者,列是不同的)。
編輯:我還應該提到,當使用隊列而不是主題時,不會出現此問題。
這是怎麼發生的?它可能與atomikos(這是交易管理器)有關嗎?或者也許在配置中的其他東西?任何想法/建議都歡迎。 :)
這是的ActiveMQ/JMS彈簧配置:
<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq" />
<property name="xaConnectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory"
p:brokerURL="${activemq_url}" />
</property>
<property name="maxPoolSize" value="10" />
<property name="localTransactionMode" value="false" />
</bean>
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory" />
</bean>
<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="sessionTransacted" value="true" />
<property name="pubSubDomain" value="true"/>
</bean>
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="test.topic" />
</bean>
<!-- The Spring message listener container configuration -->
<jms:listener-container destination-type="topic"
connection-factory="connectionFactory" transaction-manager="transactionManager"
acknowledge="transacted" concurrency="1">
<jms:listener destination="test.topic" ref="testReceiver"
method="receive" />
</jms:listener-container>
生產者:
@Component("producer")
public class EventProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Transactional
public void produceEvent(String message) {
this.jmsTemplate.convertAndSend("test.topic", message);
}
}
消費者:
@Component("testReceiver")
public class EventListener {
@Transactional
public void receive(String message) {
System.out.println(message);
}
}
測試:
@Autowired
private EventProducer eventProducer;
public void testMessages() {
for (int i = 1; i <= 10; i++) {
this.eventProducer.produceEvent("message" + i);
}
嗨加里, 謝謝你的回答,但我不知道我理解它。 首先,我們沒有使用JUnit,這只是我們創建的一個簡單場景,用於在添加更多主題,偵聽器等之前檢查配置是否工作。 此外,丟失的消息不一定是我們發送的第一個(So一些消息在丟失之前已經被接收)。 在我們開始發送之前,我們嘗試添加Thread.sleep(2000),但它沒有解決問題,並且我們也不會在真實場景中執行它...... 我會檢查您提供的鏈接並看看它是否有幫助。 :) – Ayelet
另外,您是否可以舉例說明如何在spring xmls中配置持久訂閱? 謝謝!:) – Ayelet
我改變了我的答案,刪除對JUnit的引用;我的意思是測試一般你在哪裏發送和接收在同一個應用程序。確保在發送消息之前上下文已完全刷新並且消費者已啓動。查看TRACE級別的日誌以觀察消費者開始消費。持久訂閱:請參閱此處的clientId,destinationType和訂閱:http://docs.spring.io/spring-framework/docs/current/spring-framework-reference/html/jms.html#jms-namespace記住訂閱在第一次訂閱完成之前不會持續(因此您的第一次嘗試可能會失敗)。 –