2013-04-03 86 views
1

我正在接收消費者內部來自RabbitMQ的消息。我必須處理該消息並將處理後的消息發佈到不同的隊列中。我怎麼做到這一點?rabbitmq消費者成爲生產商

我的代碼是

using (IConnection connection = factory.CreateConnection()) 
{ 
    using (IModel channel = connection.CreateModel()) 
    { 
     if (!String.IsNullOrEmpty(EXCHANGE_NAME)) 
      channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, durable); 

     if (!String.IsNullOrEmpty(QUEUE_NAME)) 
      channel.QueueDeclare(QUEUE_NAME, false, false, false, null); 

     string data = ""; 
     EventingBasicConsumer consumer = new EventingBasicConsumer(); 
     consumer.Received += (o, e) => 
     { 
      //This is the received message 
      data = data + Encoding.ASCII.GetString(e.Body) + Environment.NewLine; 
      string processed_data = "processed data = " + data; 
      //I want to write some code here to post the processed message to a different queue. 
      //or other idea is "can I use duplex services? 

     }; 
     string consumerTag = channel.BasicConsume(QUEUE_NAME, true, consumer); 

     channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null); 
     channel.QueueUnbind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null); 
    } 
} 
+0

我的問題與此類似。 http://stackoverflow.com/questions/3972756/can-you-publish-a-message-while-processing-a-queue-in-rabbitmq-net-client?rq=1 – UltimateBigChill

+0

我寫了一個新的方法,通過處理消息作爲輸入字符串。在該方法內部,我創建了一個連接工廠,一個新模型並將消息發佈到另一個隊列中。這是我採取的方法。 – UltimateBigChill

回答

2

底線是,你可以線程之間共享的連接,而不是通道。因此,在您的例子中,你可以使用相同的連接,但你需要創建一個新的渠道,當你要發佈(因爲consumer.Received活動將在不同的線程中提出):

using (IConnection connection = factory.CreateConnection()) 
{ 
    using (IModel channel = connection.CreateModel()) 
    { 
     if (!String.IsNullOrEmpty(EXCHANGE_NAME)) 
      channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, durable); 

     if (!String.IsNullOrEmpty(QUEUE_NAME)) 
      channel.QueueDeclare(QUEUE_NAME, false, false, false, null); 

     string data = ""; 
     EventingBasicConsumer consumer = new EventingBasicConsumer(); 
     consumer.Received += (o, e) => 
     { 
      //This is the received message 
      data = data + Encoding.ASCII.GetString(e.Body) + Environment.NewLine; 
      string processed_data = "processed data = " + data; 
      //I want to write some code here to post the processed message to a different queue. 
      //or other idea is "can I use duplex services? 

      using (IModel channel = connection.CreateModel()) 
      { 
       channel.Publish(...); 
      } 

     }; 
     string consumerTag = channel.BasicConsume(QUEUE_NAME, true, consumer); 

     channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null); 
     channel.QueueUnbind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null); 

     // don't dispose of your channel until you've finished consuming 
    } 

    // don't dispose of your connection until you've finished consuming 
} 

製作確保你不想處理消費者渠道,直到你想停止消費。連接也是一樣。這是一個常見的錯誤。

+0

邁克感謝您的指點。那也正是我所關心的。不想最終陷入僵局或內存泄漏。 – UltimateBigChill