2014-01-12 68 views
1

我們試圖將ActiveMQ 5.9.0作爲使用JMS主題的消息代理進行設置,但我們在使用消息時遇到了一些問題。帶有JMS主題的ActiveMQ - 消費者未出隊的一些消息

出於測試目的,我們有1個主題,1個事件生產者和1個消費者的簡單配置。我們一個接一個地發送10條消息,但每次運行應用程序時,都會消耗1-3條消息!其他消息被消耗並且處理得很好。 我們可以看到,即使我們重新啓動應用程序(我們可以看到「Enqueue」和「Dequeue」中的數字),我們發佈到ActiveMQ管理控制檯中的主題的所有消息,但他們永遠不會到達消費者,列是不同的)。

編輯:我還應該提到,當使用隊列而不是主題時,不會出現此問題。

這是怎麼發生的?它可能與atomikos(這是交易管理器)有關嗎?或者也許在配置中的其他東西?任何想法/建議都歡迎。 :)

這是的ActiveMQ/JMS彈簧配置:

<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" 
    init-method="init" destroy-method="close"> 
    <property name="uniqueResourceName" value="amq" /> 
    <property name="xaConnectionFactory"> 
     <bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory" 
      p:brokerURL="${activemq_url}" /> 
    </property> 
    <property name="maxPoolSize" value="10" /> 
    <property name="localTransactionMode" value="false" /> 
</bean> 

<bean id="cachedConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="connectionFactory" /> 
</bean> 

<!-- A JmsTemplate instance that uses the cached connection and destination --> 
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="cachedConnectionFactory" /> 
    <property name="sessionTransacted" value="true" /> 
    <property name="pubSubDomain" value="true"/> 
</bean> 

<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic"> 
    <constructor-arg value="test.topic" /> 
</bean> 

<!-- The Spring message listener container configuration --> 
<jms:listener-container destination-type="topic" 
    connection-factory="connectionFactory" transaction-manager="transactionManager" 
    acknowledge="transacted" concurrency="1"> 
    <jms:listener destination="test.topic" ref="testReceiver" 
     method="receive" /> 
</jms:listener-container> 

生產者:

@Component("producer") 
public class EventProducer { 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    @Transactional 
    public void produceEvent(String message) { 
     this.jmsTemplate.convertAndSend("test.topic", message); 
    } 
} 

消費者:

@Component("testReceiver") 
public class EventListener { 

    @Transactional 
    public void receive(String message) { 
     System.out.println(message); 
    } 
} 

測試:

@Autowired 
    private EventProducer eventProducer; 

    public void testMessages() { 

    for (int i = 1; i <= 10; i++) { 
     this.eventProducer.produceEvent("message" + i); 
    } 

回答

3

這就是JMS主題的本質 - 只有當前訂閱者默認接收郵件。在容器啓動後,您有競爭條件並在消費者建立其訂閱之前發送消息。這是單元/集成測試中常見的錯誤,其中包含您在同一應用程序中發送和接收的主題。

隨着Spring的更新版本,有一個method you can poll to wait until the subscriber is established(從3.1,我認爲)。或者,您可以在開始發送之前稍等片刻,或者您可以使訂閱持久。

+0

嗨加里, 謝謝你的回答,但我不知道我理解它。 首先,我們沒有使用JUnit,這只是我們創建的一個簡單場景,用於在添加更多主題,偵聽器等之前檢查配置是否工作。 此外,丟失的消息不一定是我們發送的第一個(So一些消息在丟失之前已經被接收)。 在我們開始發送之前,我們嘗試添加Thread.sleep(2000),但它沒有解決問題,並且我們也不會在真實場景中執行它...... 我會檢查您提供的鏈接並看看它是否有幫助。 :) – Ayelet

+0

另外,您是否可以舉例說明如何在spring xmls中配置持久訂閱? 謝謝!:) – Ayelet

+1

我改變了我的答案,刪除對JUnit的引用;我的意思是測試一般你在哪裏發送和接收在同一個應用程序。確保在發送消息之前上下文已完全刷新並且消費者已啓動。查看TRACE級別的日誌以觀察消費者開始消費。持久訂閱:請參閱此處的clientId,destinationType和訂閱:http://docs.spring.io/spring-framework/docs/current/spring-framework-reference/html/jms.html#jms-namespace記住訂閱在第一次訂閱完成之前不會持續(因此您的第一次嘗試可能會失敗)。 –

相關問題