2014-05-02 52 views
0

更新:似乎更簡單的測試用例不起作用:試圖通過進程中的代理將消息從ActiveMQ生產者發送給ActiveMQ使用者。下面是代碼:alkka-camel和ActiveMQ的請求回覆

val brokerURL = "vm://localhost?broker.persistent=false" 
val connectionFactory = new ActiveMQConnectionFactory(brokerURL) 
val connection = connectionFactory.createConnection() 
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
val queue = session.createQueue("foo.bar") 
val producer = session.createProducer(queue) 
val consumer = session.createConsumer(queue) 
val message = session.createTextMessage("marco") 

producer.send(message) 
val resp = consumer.receive(2000) 
assert(resp != null) 

我試圖實現使用阿卡駝一個非常簡單的請求 - 應答模式。下面是它試圖直接使用ActiveMQ的發送消息和預期的響應我的(測試平臺)代碼:爲消費者演員

val brokerURL = "vm://localhost?broker.persistent=false" 

// create in-process broker, session, queue, etc... 
val connectionFactory = new ActiveMQConnectionFactory(brokerURL) 
val connection = connectionFactory.createConnection() 
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
val queue = session.createQueue("myapp.somequeue") 
val producer = session.createProducer(queue) 
val tempDest = session.createTemporaryQueue() 
val respConsumer = session.createConsumer(tempDest) 
val message = session.createTextMessage("marco") 
message.setJMSReplyTo(tempDest) 
message.setJMSCorrelationID("myCorrelationID") 

// create actor system with CamelExtension 
val camel = CamelExtension(system) 
val camelContext = camel.context 
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(brokerURL)) 
val listener = system.actorOf(Props[Frontend]) 

// send a message, expect a response 
producer.send(message) 
val resp: TextMessage = respConsumer.receive(5000).asInstanceOf[TextMessage] 
assert(resp.getText() == "polo") 

我已經試過兩種不同的方法。第一個是簡單的,它試圖利用sender !迴應:

class Frontend extends Actor with Consumer { 
    def endpointUri = "activemq:myapp.somequeue" 
    override def autoAck = false 
    def receive = { 
    case msg: CamelMessage => { 
     println("received %s" format msg.bodyAs[String]) 
     sender ! "polo" 
    } 
    } 
} 

第二個嘗試回覆使用CamelTemplate:

class Frontend extends Actor with Consumer { 
    def endpointUri = "activemq:myapp.somequeue" 
    override def autoAck = false 
    def receive = { 
    case msg: CamelMessage => { 
     println("received %s" format msg.bodyAs[String]) 
     val replyTo = msg.getHeaderAs("JMSReplyTo", classOf[ActiveMQTempQueue], camelContext) 
     val correlationId = msg.getHeaderAs("JMSCorrelationID", classOf[String], camelContext) 
     camel.template.sendBodyAndHeader("activemq:"+replyTo.getQueueName(), "polo", "JMSCorrelationID", correlationId) 
    } 
    } 
} 

我看到我的演員的接收方法println()輸出,所以ActiveMQ的消息正在進入演員,但我在測試臺中呼叫respConsumer.receive()超時。我已經嘗試了很多組合的指定和不在回覆中指定標題。我也嘗試啓用和禁用autoAck

在此先感謝。

回答

1

原來我需要在JMS代碼中調用connection.start()。