2014-07-15 78 views

回答

0

您可以像使用常規隊列一樣使用來自DLX(實際上來自DLQ)的消息。你提出的建議(從DLQ獲取消息並將它發佈到隊列中,它最初是從靜止的隊列中)可能會導致消息循環(通常它會)。

最佳實踐方式是以某種單獨的方式處理死鎖消息,或者根本不要盲注它們。

+0

那裏是沒有錯的信息,但我們在處理一個錯誤(如丟失連接等)它會方便重新發送消息並嘗試重新處理。我同意,如果死信將所有事情自動回饋給大多數情況下會導致無限循環。 – jhilden

+0

使用消息確認可防止消費者意外失敗時丟失消息。 – pinepain

+0

Zaq,我們克服了失敗,使他們不會永遠陷入隊列中。我喜歡你的概念,我們只需要確保那些真正應該被忽略的東西,而且可以重新嘗試的東西仍然存在。 – jhilden

0

我沒有找到一個內置的方式來做到這一點,所以我創建了我自己的小解決方案。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Framing.v0_9_1; 

namespace RabbitMQReprocessDeadLetter 
{ 
    public class RabbitReprocessor 
    { 
     private readonly IModel _model; 
     private readonly string _deadLetterQueueName; 
     private const ushort FetchSize = 10; 
     private const string ConsumerName = "DeadLetterReprocessor"; 

     public RabbitReprocessor(IConnection rabbitConnection, string deadLetterQueueName) 
     { 
      _deadLetterQueueName = deadLetterQueueName; 
      _model = rabbitConnection.CreateModel(); 
     } 

     public void StartConsuming(CancellationTokenSource cancellationTokenSource = null) 
     { 
      // Configure the Quality of service for the model. Below is how what each setting means. 
      // BasicQos(0="Dont send me a new message untill I’ve finshed", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only") 
      _model.BasicQos(0, FetchSize, false); 

      var queueingBasicConsumer = new QueueingBasicConsumer(_model); 
      _model.BasicConsume(_deadLetterQueueName, false, ConsumerName, queueingBasicConsumer); 

      while (true) 
      { 
       if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested) 
       { 
        return; 
       } 

       var e = queueingBasicConsumer.Queue.Dequeue(); // blocking call 
       var deathProperties = (List<object>) e.BasicProperties.Headers["x-death"]; 
       var prop = (Dictionary<string, object>)deathProperties.Single(); 
       var queueAsByteArray = (byte[])prop["queue"]; 
       var queueName = queueAsByteArray.ConvertToString(); 
       var data = e.Body; 
       try 
       { 
        Console.WriteLine("{0} => {1}", queueName, data.Deserialize<long>()); 
       } 
       // ReSharper disable once EmptyGeneralCatchClause 
       catch { } 
       SendMessageToQueue(queueName, data); 
       _model.BasicAck(e.DeliveryTag, false); 
      } 
     } 

     /// <summary> 
     /// delivery the message directly into the queue from which it came. 
     /// 
     /// You may want to put it back into an exchange instead of a queue. 
     /// </summary> 
     private void SendMessageToQueue(string queueName, byte[] messageBytes) 
     { 
      const string exchangeName = ""; 
      if (string.IsNullOrEmpty(queueName)) 
      { 
       throw new ArgumentNullException("queueName"); 
      } 
      if (messageBytes == null) 
      { 
       throw new ArgumentNullException("messageBytes"); 
      } 
      var basicProperties = new BasicProperties 
      { 
       DeliveryMode = 2//2 = durable 
      }; 
      _model.BasicPublish(exchangeName, queueName, basicProperties, messageBytes); 
     } 
    } 
} 

完整的解決方案可以在這裏找到:

https://github.com/jayhilden/RabbitMQReprocessDeadLetter

的情況下