2011-10-31 108 views
9

「持久」和「持久模式」似乎與重新啓動有關,而不涉及沒有訂戶接收消息。沒有訂閱者的RabbitMQ隊列

我希望RabbitMQ在沒有訂戶的情況下將消息保留在隊列中。當用戶上線時,該用戶應收到該消息。 RabbitMQ可以嗎?

代碼示例:

服務器:

namespace RabbitEg 
{ 
    class Program 
    { 
     private const string EXCHANGE_NAME = "helloworld"; 

     static void Main(string[] args) 
     { 
      ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" }; 

      using (IConnection cn = cnFactory.CreateConnection()) 
      { 
       using (IModel channel = cn.CreateModel()) 
       { 
        //channel.ExchangeDelete(EXCHANGE_NAME); 
        channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true); 
        //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn); 

        for (int i = 0; i < 100; i++) 
        { 
         byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i); 
         IBasicProperties channelProps = channel.CreateBasicProperties(); 
         channelProps.SetPersistent(true); 

         channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad); 

         Console.WriteLine("Sent Message " + i); 
         System.Threading.Thread.Sleep(25); 
        } 

        Console.ReadLine(); 
       } 
      } 
     } 
    } 
} 

客戶:

namespace RabbitListener 
{ 
    class Program 
    { 
     private const string EXCHANGE_NAME = "helloworld"; 

     static void Main(string[] args) 
     { 
      ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" }; 

      using (IConnection cn = cnFactory.CreateConnection()) 
      { 
       using (IModel channel = cn.CreateModel()) 
       { 
        channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true); 

        string queueName = channel.QueueDeclare("myQueue", true, false, false, null); 
        channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld"); 

        Console.WriteLine("Waiting for messages"); 

        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
        channel.BasicConsume(queueName, true, consumer); 

        while (true) 
        { 
         BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
         Console.WriteLine(Encoding.ASCII.GetString(e.Body)); 
        } 
       } 
      } 
     } 
    } 
} 

回答

12

對於什麼durablepersistent意味着一個解釋,請參閱AMQP Reference

基本上,隊列要麼是durable要麼是non-durable。前者經紀人重新生存,後者不。

消息發佈爲transientpersistent。這個想法是,隊列上的persistent消息也應該能夠在代理重新啓動的情況下生存。

所以,要得到你想要的,你需要1)聲明隊列爲durable和2)發佈消息爲persistent。另外,您可能還希望在頻道上啓用發佈商確認;這樣,你就會知道經紀人何時承擔了該消息的責任。

+0

謝謝,我已經試過這個,但是如果沒有客戶端收聽,它仍然不會保留消息。代碼示例附加到問題。 –

+0

不錯的代碼。兩個問題:1)服務器還應該*聲明隊列;聲明它兩次不是問題,這是一個好習慣,並且2)queueDeclare()爲您提供了一個匿名的非持久隊列;你想要queueDeclare(「myQueue」,true,false,false,null)。 – scvalex

+0

此外,您編輯問題的方式使您很難理解您要實現的目標。 – scvalex