最近,我一直在嘗試讓Reply-To模式在Apache NMS/ActiveMQ中工作,並且一直只有使用臨時隊列的名稱。僅使用ReplyTo Name發送響應到ActiveMQ臨時隊列
該項目是調度服務,它從總線檢索請求並將它們發送到另一個進程/運行時(基於複雜的路由標準)來處理請求。然後,這個單獨的處理器使用回覆隊列名稱和關聯ID來製作響應,並將其發送給同一代理中的原始請求者,但是是不同的連接。
問題是,如果您在消息的NMSReplyTo標頭中擁有IDestination對象引用,則只能發送到臨時隊列(或主題)。如果該引用丟失,則無法通過簡單地使用其名稱將消息發送到臨時隊列(或主題)。
說明這個問題就是這樣簡單的「Pong」服務,它偵聽消息隊列並使用NMS Reply-To頭的內容向請求者發出響應。它通過簡單地調用ProcessMessage(string,string)方法來模擬將請求分派給另一個進程。
using System;
using Apache.NMS;
namespace PongService
{
/// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
class PongService
{
static ISession session = null;
static IMessageProducer producer = null;
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp://localhost:61616");
Console.WriteLine("Connecting to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
IConnection connection = factory.CreateConnection();
session = connection.CreateSession();
IDestination destination = session.GetQueue("PONG.CMD");
Console.WriteLine("Using destination: " + destination);
producer = session.CreateProducer(null);
IMessageConsumer consumer = session.CreateConsumer(destination);
connection.Start();
consumer.Listener += new MessageListener(OnMessage);
Console.WriteLine("Press any key to terminate Pong service . . .");
// loop until a key is pressed
while (!Console.KeyAvailable)
{
try { System.Threading.Thread.Sleep(50); }
catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
} // loop
Console.Write("Closing connection...");
consumer.Close();
producer.Close();
session.Close();
connection.Close();
Console.WriteLine("done.");
}
/// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
/// <param name="receivedMsg">The message received on the request queue.</param>
protected static void OnMessage(IMessage receivedMsg)
{
// mimic the operation of passing this request to an external processor which can connect
// to the broker but will not have references to the session objects including destinations
Console.WriteLine("Sending request to an external processor");
ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
}
/// <summary>Models a worker in another process/runtime.</summary>
/// <param name="queuename">Where to send the results of processing</param>
/// <param name="crid">Correlation identifier of the request.</param>
protected static void ProcessMessage(string queuename, string crid)
{
ITextMessage response = session.CreateTextMessage("Pong!");
response.NMSCorrelationID = crid;
IDestination destination = session.GetQueue(queuename);
Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
try
{
producer.Send(destination, response);
}
catch (Exception ex)
{
Console.Error.WriteLine("Could not send response: " + ex.Message);
}
}
}
}
現在爲客戶端。它只是創建一個臨時隊列,開始監聽它,然後在我們的「Pong」服務正在偵聽的隊列上發送一個請求。請求消息包含臨時隊列的IDestination。
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
namespace PongClient
{
class PongClient
{
protected static AutoResetEvent semaphore = new AutoResetEvent(false);
protected static ITextMessage message = null;
protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp://localhost:61616");
Console.WriteLine("About to connect to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
IConnection connection = factory.CreateConnection();
ISession session = connection.CreateSession();
IDestination temporaryDestination = session.CreateTemporaryQueue();
Console.WriteLine("Private destination: " + temporaryDestination);
IDestination destination = session.GetQueue("PONG.CMD");
Console.WriteLine("Service destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += new MessageListener(OnMessage);
IMessageProducer producer = session.CreateProducer(destination);
connection.Start();
// Send a request message
ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
// Wait for the message
semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
if (message == null)
{
Console.WriteLine("Timed-Out!");
}
else
{
Console.WriteLine("Received message with ID: " + message.NMSMessageId);
Console.WriteLine("Received message with text: " + message.Text);
}
}
protected static void OnMessage(IMessage receivedMsg)
{
message = receivedMsg as ITextMessage;
semaphore.Set();
}
}
}
Pong過程似乎正常工作,只是它正在製作一個全新的,與Reply-To頭中指定的隊列分離的隊列。
這裏所涉及的技術的版本:
- Apache.NMS.ActiveMQ V1.5.1
- Apache.NMS API V1.5.0
- 的ActiveMQ 5.5.0
- C#.NET 3.5
此問題與描述類似問題的this post有關。希望這些例子有助於澄清該請求中的問題。
任何幫助或解決方案的見解將不勝感激。
謝謝傑克,我不好意思發佈當前版本的代碼。我已經發現了這一點,並嘗試沒有成功。實際上,這個遺漏在張貼問題時被發現。顯然舊版本被緩存了。 - 它適合你嗎? – SCote