0
更新:似乎更簡單的測試用例不起作用:試圖通過進程中的代理將消息從ActiveMQ生產者發送給ActiveMQ使用者。下面是代碼:alkka-camel和ActiveMQ的請求回覆
val brokerURL = "vm://localhost?broker.persistent=false"
val connectionFactory = new ActiveMQConnectionFactory(brokerURL)
val connection = connectionFactory.createConnection()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("foo.bar")
val producer = session.createProducer(queue)
val consumer = session.createConsumer(queue)
val message = session.createTextMessage("marco")
producer.send(message)
val resp = consumer.receive(2000)
assert(resp != null)
我試圖實現使用阿卡駝一個非常簡單的請求 - 應答模式。下面是它試圖直接使用ActiveMQ的發送消息和預期的響應我的(測試平臺)代碼:爲消費者演員
val brokerURL = "vm://localhost?broker.persistent=false"
// create in-process broker, session, queue, etc...
val connectionFactory = new ActiveMQConnectionFactory(brokerURL)
val connection = connectionFactory.createConnection()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("myapp.somequeue")
val producer = session.createProducer(queue)
val tempDest = session.createTemporaryQueue()
val respConsumer = session.createConsumer(tempDest)
val message = session.createTextMessage("marco")
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID("myCorrelationID")
// create actor system with CamelExtension
val camel = CamelExtension(system)
val camelContext = camel.context
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(brokerURL))
val listener = system.actorOf(Props[Frontend])
// send a message, expect a response
producer.send(message)
val resp: TextMessage = respConsumer.receive(5000).asInstanceOf[TextMessage]
assert(resp.getText() == "polo")
我已經試過兩種不同的方法。第一個是簡單的,它試圖利用sender !
迴應:
class Frontend extends Actor with Consumer {
def endpointUri = "activemq:myapp.somequeue"
override def autoAck = false
def receive = {
case msg: CamelMessage => {
println("received %s" format msg.bodyAs[String])
sender ! "polo"
}
}
}
第二個嘗試回覆使用CamelTemplate:
class Frontend extends Actor with Consumer {
def endpointUri = "activemq:myapp.somequeue"
override def autoAck = false
def receive = {
case msg: CamelMessage => {
println("received %s" format msg.bodyAs[String])
val replyTo = msg.getHeaderAs("JMSReplyTo", classOf[ActiveMQTempQueue], camelContext)
val correlationId = msg.getHeaderAs("JMSCorrelationID", classOf[String], camelContext)
camel.template.sendBodyAndHeader("activemq:"+replyTo.getQueueName(), "polo", "JMSCorrelationID", correlationId)
}
}
}
我看到我的演員的接收方法println()
輸出,所以ActiveMQ的消息正在進入演員,但我在測試臺中呼叫respConsumer.receive()
超時。我已經嘗試了很多組合的指定和不在回覆中指定標題。我也嘗試啓用和禁用autoAck
。
在此先感謝。