我正在尋找一種方法從兔死亡隊列隊列中取出物品並將其重新處理回隊列中。有沒有內置的方法來做到這一點?RabbitMQ重新處理死亡隊列隊列
0
A
回答
0
您可以像使用常規隊列一樣使用來自DLX(實際上來自DLQ)的消息。你提出的建議(從DLQ獲取消息並將它發佈到隊列中,它最初是從靜止的隊列中)可能會導致消息循環(通常它會)。
最佳實踐方式是以某種單獨的方式處理死鎖消息,或者根本不要盲注它們。
0
我沒有找到一個內置的方式來做到這一點,所以我創建了我自己的小解決方案。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.v0_9_1;
namespace RabbitMQReprocessDeadLetter
{
public class RabbitReprocessor
{
private readonly IModel _model;
private readonly string _deadLetterQueueName;
private const ushort FetchSize = 10;
private const string ConsumerName = "DeadLetterReprocessor";
public RabbitReprocessor(IConnection rabbitConnection, string deadLetterQueueName)
{
_deadLetterQueueName = deadLetterQueueName;
_model = rabbitConnection.CreateModel();
}
public void StartConsuming(CancellationTokenSource cancellationTokenSource = null)
{
// Configure the Quality of service for the model. Below is how what each setting means.
// BasicQos(0="Dont send me a new message untill I’ve finshed", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
_model.BasicQos(0, FetchSize, false);
var queueingBasicConsumer = new QueueingBasicConsumer(_model);
_model.BasicConsume(_deadLetterQueueName, false, ConsumerName, queueingBasicConsumer);
while (true)
{
if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested)
{
return;
}
var e = queueingBasicConsumer.Queue.Dequeue(); // blocking call
var deathProperties = (List<object>) e.BasicProperties.Headers["x-death"];
var prop = (Dictionary<string, object>)deathProperties.Single();
var queueAsByteArray = (byte[])prop["queue"];
var queueName = queueAsByteArray.ConvertToString();
var data = e.Body;
try
{
Console.WriteLine("{0} => {1}", queueName, data.Deserialize<long>());
}
// ReSharper disable once EmptyGeneralCatchClause
catch { }
SendMessageToQueue(queueName, data);
_model.BasicAck(e.DeliveryTag, false);
}
}
/// <summary>
/// delivery the message directly into the queue from which it came.
///
/// You may want to put it back into an exchange instead of a queue.
/// </summary>
private void SendMessageToQueue(string queueName, byte[] messageBytes)
{
const string exchangeName = "";
if (string.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("queueName");
}
if (messageBytes == null)
{
throw new ArgumentNullException("messageBytes");
}
var basicProperties = new BasicProperties
{
DeliveryMode = 2//2 = durable
};
_model.BasicPublish(exchangeName, queueName, basicProperties, messageBytes);
}
}
}
完整的解決方案可以在這裏找到:
的情況下相關問題
- 1. RabbitMQ死信交換/隊列
- 2. 使用RabbitMQ逐一處理隊列
- 3. Python隊列似乎正在死亡
- 4. RabbitMQ隊列訂單管理
- 5. RabbitMQ - parellel隊列
- 6. RabbitMQ隊列窺探
- 7. Rabbitmq隊列分片
- 8. RabbitMQ隊列組織
- 9. 使用MSMQ死信隊列或管理隊列處理未發送的消息?
- 10. 死信隊列和毒物隊列
- 11. 並行處理多個rabbitmq隊列的Spark Streaming處理
- 12. RabbitMQ/AMQP HA App:消費者死亡/退出後的持續隊列?
- 13. 死信隊列和退隊隊列有什麼區別?
- 14. 去隊列處理失敗後重試
- 15. 定義隊列中的RabbitMQ
- 16. 芹菜+ RabbitMQ的空隊列
- 17. RabbitMQ:檢查隊列存在
- 18. Rabbitmq鏡像隊列性能
- 19. RabbitMQ掛起隊列消耗
- 20. 的RabbitMQ AMQP隊列設計
- 21. RabbitMQ,EasyNetQ隊列名稱
- 22. NServiceBus不創建RabbitMQ隊列
- 23. RabbitMQ - 非持久隊列
- 24. RabbitMQ - 消耗多個隊列
- 25. RabbitMQ和隊列數據
- 26. 多處理和隊列
- 27. 如何處理隊列?
- 28. Nodejs sqs隊列處理器
- 29. 多處理隊列已滿
- 30. 線程/多處理/隊列?
那裏是沒有錯的信息,但我們在處理一個錯誤(如丟失連接等)它會方便重新發送消息並嘗試重新處理。我同意,如果死信將所有事情自動回饋給大多數情況下會導致無限循環。 – jhilden
使用消息確認可防止消費者意外失敗時丟失消息。 – pinepain
Zaq,我們克服了失敗,使他們不會永遠陷入隊列中。我喜歡你的概念,我們只需要確保那些真正應該被忽略的東西,而且可以重新嘗試的東西仍然存在。 – jhilden