@ mookid8000: 好吧,這使得它十分清楚,當消息ACK
版或NACK
版。
但是我能夠反駁你的假設。我在單獨的控制檯應用程序中綁定到相同的輸入隊列創建了兩個使用者其中一個總是拋出異常。發佈者每10秒發送一條消息。我可以看到,其他使用者在故障拋出異常之後處理消息。我明白了,因爲消息的ID是由Rebus在控制檯中記錄的。
這回答我的問題,但沒有解決我的問題。就我而言,我真正想要的是可以讓消息保留在隊列中,直到其中一個服務實例能夠處理它爲止。原因是消息的順序很重要。這可能是我的方法中的一個根本性錯誤,但現在我不想改變這一點。
是否有辦法阻止(某些)消息移動到錯誤隊列? Rebus的二級重試機制是否可以實現這一目標?
有關進一步參考下面的源代碼:
普通消費者:
class TimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
Console.WriteLine(message.Time);
//await Program.Activator.Bus.Reply("Ja danke");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new TimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:[email protected]",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
// Without the unsubscribe we have a durable subscriber, see http://www.enterpriseintegrationpatterns.com/patterns/messaging/DurableSubscription.html
//bus.Unsubscribe<TimeEvent>();
}
}
}
故障消費者:
class FaultedTimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
throw new Exception("That should not have happened");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new FaultedTimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:[email protected]",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
出版商:
public static class PubSubTest
{
public static void Start()
{
Console.WriteLine("Starting PubSubTest");
using (var activator = new BuiltinHandlerActivator())
{
var bus = Configure
.With(activator)
.Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", "MessagingTest").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
.Subscribe(_ => bus.Publish(new TimeEvent(DateTime.Now)).Wait());
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
更新26.07.17: 我的結果與二級重試:激活它們後,我通過推遲和重新發送它們來處理它們。通過這樣做,我至少可以確保消息稍後處理,而不會丟失。
public async Task Handle(IFailed<TimeEvent> failedMessage)
{
await bus.Defer(TimeSpan.FromSeconds(30), failedMessage.Message);
}
這不是一個最佳的解決方案:
- 順序我的消息被改變:消息被延遲後,下一個消息從隊列消耗。
- Timeoutmanager是內存中的,我無權訪問SQL-Server。
我讀到可以在RabbitMQ中使用延遲消息。您可以使用dead-letter-exchanges或plugin。
將maxDeliveryAttempts增加到Int32.MaxValue
非常骯髒,但按照我的要求:它保留消息,最重要的是消息在隊列中的順序。
我將此問題標記爲已解決,因爲「如何在Rebus中中止交易」這個問題已得到解答。
我通過從傳入處理程序並調用終止處理的IMessageContext中取出TransactionContext來終止事務,如我的代碼示例所示。我也嘗試過拋出異常,但最終導致Rebus在一段時間後將消息移入錯誤隊列。如果由於某種原因,當時沒有其他相同服務的實例正在運行,就會發生這種情況。 –