2013-05-10 57 views
0

我正在尋找插件方法(jms和/或駱駝路由插件)從Grails中使用ActiveMQ的替代方案。目前爲止這麼好,但我無法找到任何管理連接的好方法。Grails和ActiveMQ:如何管理DefaultMessageListenerContainer的連接

這裏是我的config /春/ resources.groovy:

import org.springframework.jms.connection.SingleConnectionFactory 
import org.apache.activemq.ActiveMQConnectionFactory 
import org.springframework.jms.listener.adapter.MessageListenerAdapter 
import org.springframework.jms.listener.DefaultMessageListenerContainer 

beans = { 
    jmsConnectionFactory(SingleConnectionFactory) { 
     targetConnectionFactory = { ActiveMQConnectionFactory cf -> 
      brokerURL = "tcp://localhost:61616" 
     } 
    } 

    jmsMessageListener(MessageListenerAdapter, ref("myService")) { 
     defaultListenerMethod = "onIncomingMessage" 
    } 

    jmsContainer(DefaultMessageListenerContainer) { 
     connectionFactory = jmsConnectionFactory 
     destinationName = "StatusSavedTopic" 
     messageListener = jmsMessageListener 
     autoStartup = true 

     // Tells the magic sauce to be an ActiveMQ topic 
     pubSubDomain = true 
    } 
} 

如果我設置autoStartup爲true,它正常工作與run-app直到我救我的服務,引起了重新編譯。發生這種情況時,連接將被丟棄(通過檢查ActiveMQ Web控制檯確認),並且不會收到更多消息(顯然)。

有沒有辦法確保我的jmsContainer保持活着,除非手動執行此操作並使用jms或路由插件?

+1

不知道這是相關的,但轉接插件(http://grails.org/plugin/routing)文檔指出以下:「重裝路線已被刪除,Apache Camel不支持路由重新加載,現有的hack在該庫的最新版本中無法正常工作。「 (我知道你沒有使用該插件,但也許你的問題的根本原因是相同的) – 2013-05-10 14:22:32

回答

2

我在回答自己,因爲我找到了解決方案。

解決方案是在進程中嵌入ActiveMQ代理。我總是向這個嵌入式代理髮布消息,我不必測試其可用性或處理重新連接,原因很明顯 - 在我的過程中,它就在那裏。

ActiveMQ本身就有一種連接代理的方式,因此一個消息總是被傳遞到下一個,而ActiveMQ自動處理所有連接的東西。初始連接(如果代理在應用程序啓動時關閉),或者如果代理在運行中關閉,則重新連接。

您可以自由選擇嵌入式代理的存儲。如果您將郵件持久存儲到磁盤,ActiveMQ還會負責在消息在嵌入式代理中排隊後,但在ActiveMQ設法交付給中央代理之前,您的應用停機時,將消息發佈到中央代理。

下面是grails-app/conf/spring中用於設置應用程序的bean代碼。我有點春天的小菜,所以我確信有更好的方法。使用MethodInvokingFactoryBean可以很容易地完成Spring自身的所有初始化初始化操作,這有助於我在演出中獲得這個展示。

activemqLocalMessageDeliveryConnection(org.springframework.beans.factory.config.MethodInvokingFactoryBean) { 
    targetClass = "com.myapp.ActiveMQLocalBrokerHelper" 
    targetMethod = "createConnection" 
    arguments = [ref("grailsApplication")] 
} 

activemqLocalMessageDeliveryProducerSession(org.springframework.beans.factory.config.MethodInvokingFactoryBean) { 
    targetClass = "com.myapp.ActiveMQLocalBrokerHelper" 
    targetMethod = "createProducerSession" 
    arguments = [ref("activemqLocalMessageDeliveryConnection")] 
} 

activemqLocalMessageDeliveryProducer(org.springframework.beans.factory.config.MethodInvokingFactoryBean) { bean -> 
    targetClass = "com.myapp.ActiveMQLocalBrokerHelper" 
    targetMethod = "createProducer" 
    arguments = [ref("grailsApplication"), ref("activemqLocalMessageDeliveryProducerSession")] 
} 

以下是ActiveMQLocalBrokerHelper的代碼。

package com.myapp 
import java.io.File 
import org.apache.activemq.command.ActiveMQDestination 

class ActiveMQLocalBrokerHelper { 
    static javax.jms.Connection createConnection(grailsApplication) { 
     // http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html 
     def localBrokerName = "localMessageDelivery" 

     def broker = new org.apache.activemq.broker.BrokerService() 
     broker.setPersistent(false) 
     broker.setBrokerName(localBrokerName) 
     broker.start() 

     def connFactory = new org.apache.activemq.ActiveMQConnectionFactory(broker.getVmConnectorURI()); 
     def conn = connFactory.createConnection(); 
     conn.start() 

     return conn 
    } 

    static javax.jms.Session createProducerSession(conn) { 
     def session = conn.createSession(true, javax.jms.Session.AUTO_ACKNOWLEDGE); 
     return session 
    } 

    static javax.jms.MessageProducer createProducer(grailsApplication, session) { 
     def destination = session.createQueue("MessageDelivery") 
     def producer = session.createProducer(destination) 
     producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT) 
     return producer 
    } 
} 

這就是您需要獲得嵌入式ActiveMQ代理的全部內容。這個特殊的不是持久的,而且是事務性的。當然,您可以自由使用ActiveMQ Java API來做任何事情。要排隊和使用消息,只需使用正常的ActiveMQ API。例如:

def activemqLocalMessageDeliveryProducer // Inject the producer 
def activemqLocalMessageDeliveryProducerSession // And the session 

void enqueue(String messageText) { 
    def message = session.createTextMessage(messageText) 
    producer.send(message) 
    session.commit() 
} 

對於消費:

def activemqLocalMessageDeliveryConnection // Inject the connection 

void consume() { 
    def activemqSession = activemqLocalMessageDeliveryConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
    def destination = activemqSession.createQueue("MyQueueName") 
    def consumer = activemqSession.createConsumer(destination) 
    while (true) { 
     def message = consumer.receive() 
     // Do something with message 
    } 
}