2011-07-14 46 views
6

我是RabbitMQ的新手。當有多個隊列時(讀取),我希望能夠處理閱讀消息而不會阻塞。關於我如何做到這一點的任何投入?從多個隊列讀取,RabbitMQ

//編輯1個

public class Rabbit : IMessageBus 
{ 

    private List<string> publishQ = new List<string>(); 
    private List<string> subscribeQ = new List<string>(); 

    ConnectionFactory factory = null; 
    IConnection connection = null; 
    IModel channel = null; 
    Subscription sub = null; 

    public void writeMessage(Measurement m1) { 
     byte[] body = Measurement.AltSerialize(m1); 
     int msgCount = 1; 
     Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id); 

     string finalQueue = publishToQueue(m1.id); 

     while (msgCount --> 0) { 
      channel.BasicPublish("amq.direct", finalQueue, null, body); 
     } 

     Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id); 
    } 

    public string publishToQueue(string firstQueueName) { 
     Console.WriteLine("Creating a queue and binding it to amq.direct"); 
     string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null); 
     channel.QueueBind(queueName, "amq.direct", queueName, null); 
     Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName); 
     return queueName; 
    } 


    public Measurement readMessage() { 
     Console.WriteLine("Receiving message..."); 
     Measurement m = new Measurement(); 

     int i = 0; 
     foreach (BasicDeliverEventArgs ev in sub) { 
      m = Measurement.AltDeSerialize(ev.Body); 
      //m.id = //get the id here, from sub 
      if (++i == 1) 
       break; 
      sub.Ack(); 
     } 

     Console.WriteLine("Done.\n"); 
     return m; 
    } 


    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
    } 

    public static string MsgSysName; 
    public string MsgSys 
    { 
     get 
     { 
      return MsgSysName; 
     } 
     set 
     { 
      MsgSysName = value; 
     } 
    } 

    public Rabbit(string _msgSys) //Constructor 
    { 
     factory = new ConnectionFactory(); 
     factory.HostName = "localhost"; 
     connection = factory.CreateConnection(); 
     channel = connection.CreateModel(); 
     //consumer = new QueueingBasicConsumer(channel); 

     System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
     MsgSys = _msgSys; 
    } 

    ~Rabbit() 
    { 
     //observer?? 
     connection.Dispose(); 
     //channel.Dispose(); 
     System.Console.WriteLine("\nDestroying RABBIT"); 
    } 
} 

//編輯2個

private List<Subscription> subscriptions = new List<Subscription>(); 
    Subscription sub = null; 

public Measurement readMessage() 
    { 
     Measurement m = new Measurement(); 
     foreach(Subscription element in subscriptions) 
     { 
      foreach (BasicDeliverEventArgs ev in element) { 
       //ev = element.Next(); 
       if(ev != null) { 
        m = Measurement.AltDeSerialize(ev.Body); 
        return m; 
       } 
       m = null; 
      }   
     } 
     System.Console.WriteLine("No message in the queue(s) at this time."); 
     return m; 
    } 

    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
     subscriptions.Add(sub);  
    } 

//編輯3

//MessageHandler.cs 

public class MessageHandler 
{ 
    // Implementation of methods for Rabbit class go here 
    private List<string> publishQ = new List<string>(); 
    private List<string> subscribeQ = new List<string>(); 

    ConnectionFactory factory = null; 
    IConnection connection = null; 
    IModel channel = null; 
    QueueingBasicConsumer consumer = null; 

    private List<Subscription> subscriptions = new List<Subscription>(); 
    Subscription sub = null; 

    public void writeMessage (Measurement m1) 
    { 
     byte[] body = Measurement.AltSerialize(m1); 
     //declare a queue if it doesn't exist 
     publishToQueue(m1.id); 

     channel.BasicPublish("amq.direct", m1.id, null, body); 
     Console.WriteLine("\n [x] Sent to queue {0}.", m1.id); 
    } 

    public void publishToQueue(string queueName) 
    { 
     string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null); 
     channel.QueueBind(finalQueueName, "amq.direct", "", null); 
    } 

    public Measurement readMessage() 
    { 
     Measurement m = new Measurement(); 
     foreach(Subscription element in subscriptions) 
     { 
      if(element.QueueName == null) 
      { 
       m = null; 
      } 
      else 
      { 
       BasicDeliverEventArgs ev = element.Next(); 
       if(ev != null) { 
        m = Measurement.AltDeSerialize(ev.Body); 
        m.id = element.QueueName; 
        element.Ack(); 
        return m; 
       } 
       m = null;      
      } 
      element.Ack(); 
     } 
     System.Console.WriteLine("No message in the queue(s) at this time."); 
     return m; 
    } 

    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
     subscriptions.Add(sub); 
    } 

    public static string MsgSysName; 
    public string MsgSys 
    { 
     get 
     { 
      return MsgSysName; 
     } 
     set 
     { 
      MsgSysName = value; 
     } 
    } 

    public MessageHandler(string _msgSys) //Constructor 
    { 
     factory = new ConnectionFactory(); 
     factory.HostName = "localhost"; 
     connection = factory.CreateConnection(); 
     channel = connection.CreateModel(); 
     consumer = new QueueingBasicConsumer(channel); 

     System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
     MsgSys = _msgSys; 
    } 

    public void disposeAll() 
    { 
     connection.Dispose(); 
     channel.Dispose(); 
     foreach(Subscription element in subscriptions) 
     { 
      element.Close(); 
     } 
     System.Console.WriteLine("\nDestroying RABBIT"); 
    } 
} 

//App1.cs

using System; 
using System.IO; 

using UtilityMeasurement; 
using UtilityMessageBus; 


public class MainClass 
{ 
    public static void Main() 
    { 

    MessageHandler obj1 = MessageHandler("Rabbit"); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    //Create new Measurement messages 
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33); 

    System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id); 
    System.Console.WriteLine(" Time: {0}", m1.time); 
    System.Console.WriteLine(" Value: {0}", m1.value); 

    System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id); 
    System.Console.WriteLine(" Time: {0}", m2.time); 
    System.Console.WriteLine(" Value: {0}", m2.value); 

    // Ask queue name and store it 
    System.Console.WriteLine("\nName of queue to publish to: "); 
    string queueName = (System.Console.ReadLine()).ToString(); 
    obj1.publishToQueue(queueName); 

    // Write message to the queue 
    obj1.writeMessage(m1);  

    System.Console.WriteLine("\nName of queue to publish to: "); 
    string queueName2 = (System.Console.ReadLine()).ToString(); 
    obj1.publishToQueue(queueName2); 

    obj1.writeMessage(m2); 

    obj1.disposeAll(); 
} 
} 

//App2.cs

using System; 
using System.IO; 

using UtilityMeasurement; 
using UtilityMessageBus; 

public class MainClass 
{ 
    public static void Main() 
    { 
    //Asks for the message system 
    System.Console.WriteLine("\nEnter name of messageing system: "); 
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); 
    string MsgSysName = (System.Console.ReadLine()).ToString(); 

    //Declare an IMessageBus instance: 
    //Here, an object of the corresponding Message System 
     // (ex. Rabbit, Zmq, etc) is instantiated 
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    //Create a new Measurement object m 
    Measurement m = new Measurement(); 

    System.Console.WriteLine("Queue name to subscribe to: "); 
    string QueueName1 = (System.Console.ReadLine()).ToString(); 
    obj1.subscribeToQueue(QueueName1); 

    //Read message into m 
    m = obj1.readMessage(); 

    if (m != null) { 
     System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); 
     System.Console.WriteLine(" Time: {0}", m.time); 
     System.Console.WriteLine(" Value: {0}", m.value); 
    } 

    System.Console.WriteLine("Another queue name to subscribe to: "); 
    string QueueName2 = (System.Console.ReadLine()).ToString(); 
    obj1.subscribeToQueue(QueueName2); 

    m = obj1.readMessage(); 

    if (m != null) { 
     System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); 
     System.Console.WriteLine(" Time: {0}", m.time); 
     System.Console.WriteLine(" Value: {0}", m.value); 
    } 

    obj1.disposeAll(); 
} 
} 

回答

12

在這兩種信息來源:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. 你確實應該嘗試先了解例子。

    • %程序Files%\ RabbitMQ的\ DotNetClient \例子\ SRC(基本的例子)

    • 從他們的Mercurial庫(C#項目)完整的工作的例子。

有用的操作就明白了:

  • 申報/斷言/聽/訂閱/發佈

回覆: 你的問題 - 有沒有理由,你可以」沒有多個聽衆。或者你可以在一個「交易所」上與一個聽衆訂閱n個路由路徑。

**重:無阻塞**

典型listenner消費消息一次一個。您可以將它們從隊列中拉出來,或者它們將以'窗口'方式(通過服務質量qos參數定義)自動放置在用戶附近。這種方法的優點在於爲您完成了許多艱苦的工作(重新:可靠性,保證交付等)。

RabbitMQ的一個關鍵特性是,如果在處理過程中出現錯誤,則將消息重新添加到隊列中(容錯功能)。

需要了解更多關於你的情況。

通常,如果您發佈到上面提到的列表中,您可以在RabbitMQ的工作人員身上找到某人。他們非常有幫助。

希望有所幫助。開始時你的頭腦很好,但值得堅持下去。


Q &一個

見:http://www.rabbitmq.com/faq.html

問:您可以訂閱多個隊列使用新的訂閱(通道,QUEUENAME)?

是的。您可以使用綁定鍵,例如abc。*。hij或abc。#。hij,或者您附加多個綁定。前者假定您已經圍繞某種適合您的原則設計了路由鍵(請參閱FAQ中的路由鍵)。對於後者,您需要綁定到多個隊列。

手動實現n綁定。 請參閱:http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

這種模式背後沒有太多的代碼,所以如果通配符不夠,您可以推出自己的訂閱模式。你可以從這個類繼承,並添加另一個方法來額外綁定...可能這會工作或接近這個(未經測試)。

的AQMP規範說多次手動結合是可能的:http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

Q. And if so, how can I go through all the subscribed queues and return a message (null when no messages)?

隨着當有可用消息通知您的訂戶。否則,你所描述的是一個拉接口,你可以根據請求拉取消息。如果沒有可用的消息,你會得到一個空你想要的。 btw:Notify方法可能更方便。

Q. Oh, and mind you that that I have all this operations in different methods. I will edit my post to reflect the code

直播代碼:

這個版本必須使用通配符來訂閱多個路由使用訂閱密鑰

ň手動佈線鍵就留給讀者自己練習。 ;-)我認爲你無論如何都傾向於拉取界面。順便說一句:拉接口效率低於通知。

 using (Subscription sub = new Subscription(ch, QueueNme)) 
     { 
      foreach (BasicDeliverEventArgs ev in sub) 
      { 
       Process(ev.Body); 

     ... 

注:的foreach使用的IEnumerable,和IEnumerable包裝了一個新的消息已經通過「產量」語句到達的事件。實際上它是一個無限循環。

--- UPDATE

AMQP的設計採用了TCP保持連接數低的應用程序數量的想法,所以這意味着你可以有每個連接多種渠道。

這個問題(編輯3)中的代碼試圖使用兩個訂閱者一個通道,而它應該(我相信),每個線程每個通道一個用戶,以避免鎖定問題。 Sugestion:使用路由鍵「通配符」。可以使用java客戶端訂閱多個不同的隊列名稱,但是.net客戶端在我的知識中並沒有在訂閱服務器類中實現。

如果您確實需要在同一個訂閱線程上有兩個不同的隊列名稱,則建議使用以下拉取序列。網:

 using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO 
      conn.AutoClose = true;     // no reason to closs the connection either. Here for completeness. 

      ch.QueueDeclare(queueName); 
      BasicGetResult result = ch.BasicGet(queueName, false); 
      if (result == null) { 
       Console.WriteLine("No message available."); 
      } else { 
       ch.BasicAck(result.DeliveryTag, false); 
       Console.WriteLine("Message:"); 
      } 

      return 0; 
     } 

- 更新2:

從RabbitMQ的列表:

「假設element.Next()被阻塞其中一個預訂 你可以從每個訂閱獲取交付超時時間爲 ,或者您可以設置一個隊列來接收所有測量結果,並通過一次訂閱從中檢索消息。「 (Emile)

這意味着當第一個隊列爲空時,.Next()塊會等待下一個消息出現。即,用戶有一個等待換下一消息內置

- UPDATE 3:

下.NET,使用QueueingBasicConsumer用於從多個隊列中的消耗。

其實這裏有一個關於它的線程來獲得使用感受:

Wait for a single RabbitMQ message with a timeout

- UPDATE4:

在.QueueingBasicConsumer

一些更多的信息這裏有示例代碼。

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

例如複製到具有一些修改的應答(參見// < -----)。更新5:一個簡單的獲取,將作用於任何隊列(一個較慢,但有時更方便的方法)。

  ch.QueueDeclare(queueName); 
      BasicGetResult result = ch.BasicGet(queueName, false); 
      if (result == null) { 
       Console.WriteLine("No message available."); 
      } else { 
       ch.BasicAck(result.DeliveryTag, false); 
       Console.WriteLine("Message:"); 
       // deserialize body and display extra info here. 
      } 
+0

非常感謝您的反饋。我仍在學習消息系統,還有一些操作我仍然不明白。喜歡聽。我也看到了rabbitmq如何訂閱一個隊列。你能訂閱多個使用新訂閱(通道,隊列名)的隊列嗎?如果是這樣,我如何通過所有訂閱的隊列並返回一條消息(在沒有消息時爲null)?哦,並且介意我以不同的方法進行所有這些操作。我將編輯我的帖子以反映代碼。 – Demi

+0

再次感謝。我編輯了上面的訂閱和寫入功能的代碼。但是,我有這個運行時錯誤:如果我訂閱說兩個隊列,並嘗試讀取消息,我只能第一次回顧消息。我看不出它在哪裏搞砸了。你可以看一下,如果我? – Demi

+0

@Demi ...採取了一些狩獵。我認爲您在閱讀器循環結束時缺少「subscriptions.Ack()」?這意味着'我已經成功處理了這封郵件,所以給我下一封郵件。'讓我知道如果是這樣。否則你看起來很近。 – sgtz

1

最簡單的方法是使用EventingBasicConsumer。我在我的網站上有一個關於如何使用它的例子。 RabbitMQ EventingBasicConsumer

此Consumer類公開可以使用的接收事件,因此不會阻止。其餘代碼基本保持不變。

相關問題