2011-09-07 35 views
0

最近,我一直在嘗試讓Reply-To模式在Apache NMS/ActiveMQ中工作,並且一直只有使用臨時隊列的名稱。僅使用ReplyTo Name發送響應到ActiveMQ臨時隊列

該項目是調度服務,它從總線檢索請求並將它們發送到另一個進程/運行時(基於複雜的路由標準)來處理請求。然後,這個單獨的處理器使用回覆隊列名稱和關聯ID來製作響應,並將其發送給同一代理中的原始請求者,但是是不同的連接。

問題是,如果您在消息的NMSReplyTo標頭中擁有IDestination對象引用,則只能發送到臨時隊列(或主題)。如果該引用丟失,則無法通過簡單地使用其名稱將消息發送到臨時隊列(或主題)。

說明這個問題就是這樣簡單的「Pong」服務,它偵聽消息隊列並使用NMS Reply-To頭的內容向請求者發出響應。它通過簡單地調用ProcessMessage(string,string)方法來模擬將請求分派給另一個進程。

using System; 
    using Apache.NMS; 

    namespace PongService 
    { 
     /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary> 
     class PongService 
     { 
      static ISession session = null; 
      static IMessageProducer producer = null; 

      public static void Main(string[] args) 
      { 
       Uri connecturi = new Uri("activemq:tcp://localhost:61616"); 
       Console.WriteLine("Connecting to " + connecturi); 

       IConnectionFactory factory = new NMSConnectionFactory(connecturi); 
       IConnection connection = factory.CreateConnection(); 
       session = connection.CreateSession(); 

       IDestination destination = session.GetQueue("PONG.CMD"); 
       Console.WriteLine("Using destination: " + destination); 

       producer = session.CreateProducer(null); 

       IMessageConsumer consumer = session.CreateConsumer(destination); 

       connection.Start(); 

       consumer.Listener += new MessageListener(OnMessage); 

       Console.WriteLine("Press any key to terminate Pong service . . ."); 

       // loop until a key is pressed 
       while (!Console.KeyAvailable) 
       { 
        try { System.Threading.Thread.Sleep(50); } 
        catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); } 
       } // loop 

       Console.Write("Closing connection..."); 
       consumer.Close(); 
       producer.Close(); 
       session.Close(); 
       connection.Close(); 
       Console.WriteLine("done."); 
      } 


      /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary> 
      /// <param name="receivedMsg">The message received on the request queue.</param> 
      protected static void OnMessage(IMessage receivedMsg) 
      { 
       // mimic the operation of passing this request to an external processor which can connect 
       // to the broker but will not have references to the session objects including destinations 
       Console.WriteLine("Sending request to an external processor"); 
       ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString()); 
      } 


      /// <summary>Models a worker in another process/runtime.</summary> 
      /// <param name="queuename">Where to send the results of processing</param> 
      /// <param name="crid">Correlation identifier of the request.</param> 
      protected static void ProcessMessage(string queuename, string crid) 
      { 
       ITextMessage response = session.CreateTextMessage("Pong!"); 
       response.NMSCorrelationID = crid; 

       IDestination destination = session.GetQueue(queuename); 

       Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'"); 
       try 
       { 
        producer.Send(destination, response); 
       } 
       catch (Exception ex) 
       { 
        Console.Error.WriteLine("Could not send response: " + ex.Message); 
       } 

      } 

     } 

    } 

現在爲客戶端。它只是創建一個臨時隊列,開始監聽它,然後在我們的「Pong」服務正在偵聽的隊列上發送一個請求。請求消息包含臨時隊列的IDestination。

using System; 
    using System.Threading; 
    using Apache.NMS; 
    using Apache.NMS.Util; 

    namespace PongClient 
    { 
     class PongClient 
     { 
      protected static AutoResetEvent semaphore = new AutoResetEvent(false); 
      protected static ITextMessage message = null; 
      protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3); 

      public static void Main(string[] args) 
      { 
       Uri connecturi = new Uri("activemq:tcp://localhost:61616"); 
       Console.WriteLine("About to connect to " + connecturi); 

       IConnectionFactory factory = new NMSConnectionFactory(connecturi); 

       IConnection connection = factory.CreateConnection(); 
       ISession session = connection.CreateSession(); 

       IDestination temporaryDestination = session.CreateTemporaryQueue(); 
       Console.WriteLine("Private destination: " + temporaryDestination); 

       IDestination destination = session.GetQueue("PONG.CMD"); 
       Console.WriteLine("Service destination: " + destination); 


       IMessageConsumer consumer = session.CreateConsumer(destination); 
       consumer.Listener += new MessageListener(OnMessage); 

       IMessageProducer producer = session.CreateProducer(destination); 

       connection.Start(); 

       // Send a request message 
       ITextMessage request = session.CreateTextMessage("Ping"); 
       request.NMSCorrelationID = Guid.NewGuid().ToString(); 
       request.NMSReplyTo = temporaryDestination; 
       producer.Send(request); 

       // Wait for the message 
       semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true); 
       if (message == null) 
       { 
        Console.WriteLine("Timed-Out!"); 
       } 
       else 
       { 
        Console.WriteLine("Received message with ID: " + message.NMSMessageId); 
        Console.WriteLine("Received message with text: " + message.Text); 
       } 
      } 



      protected static void OnMessage(IMessage receivedMsg) 
      { 
       message = receivedMsg as ITextMessage; 
       semaphore.Set(); 
      } 
     } 
    } 

Pong過程似乎正常工作,只是它正在製作一個全新的,與Reply-To頭中指定的隊列分離的隊列。

這裏所涉及的技術的版本:

  • Apache.NMS.ActiveMQ V1.5.1
  • Apache.NMS API V1.5.0
  • 的ActiveMQ 5.5.0
  • C#.NET 3.5

此問題與描述類似問題的this post有關。希望這些例子有助於澄清該請求中的問題。

任何幫助或解決方案的見解將不勝感激。

回答

1

你實際上並沒有在來自PongClient的請求消息中設置回覆頭。

試試這個:

ITextMessage request = session.CreateTextMessage("Ping"); 
request.NMSCorrelationID = Guid.NewGuid().ToString(); 
request.NMSReplyTo = temporaryDestination; 
producer.Send(request); 
+0

謝謝傑克,我不好意思發佈當前版本的代碼。我已經發現了這一點,並嘗試沒有成功。實際上,這個遺漏在張貼問題時被發現。顯然舊版本被緩存了。 - 它適合你嗎? – SCote

0

您需要使用您IDestination傳遞。

調用

IDestination destination = session.GetQueue(queuename); 

是有點邪。在調用CreateTemporaryQueue()時,它會在不通知您的情況下用新的相同名稱替換現有的臨時隊列。

+0

我正在使用IBM.XMS 8.0.0.4,並且在ISession接口中沒有稱爲GetQueue的方法 – Erez

0

我會建議使用主題作爲回覆目標,並根據NMSCorrelationID設置您的使用者過濾器。這是我在臨時隊列受挫之後所實施的實現。它實際上有很多優點。

  1. 它減少了服務器上的密集資源使用(不需要構建/解構臨時隊列)。
  2. 它允許您使用另一個使用者來監視發回的響應(您將永遠無法在臨時隊列中「偷看」)。
  3. 它更可靠,因爲可以通過邏輯名稱傳遞主題,而不是通過特定的令牌標識(跨連接丟失)。
相關問題