2017-07-13 45 views
1

我在嘗試中止消息處理程序中的事務時很掙扎。我正在使用RabbitMQ如何中止rebus中的交易?

我的目標是具有以下行爲:如果收到消息,我嘗試將其內容存儲在硬盤上。如果失敗了,我想重新輸入郵件。通過這樣做,我給同一服務的另一個實例提供了嘗試相同的機會。 我想要的基本上是控制消息何時被編輯或拒絕的可能性。

我已經查看了源代碼,特別是RabbitMqTransport.cs,發現當交易提交時發送了一個ACK。如果交易被中止,則發送NACK。我曾經自己圍繞RabbitMQ創建了一個包裝類,因此知道這是正確的。

但是,似乎OnAborted永遠不會被調用。即使我放棄交易,也會調用OnComitted

我使用以下代碼來中止事務。 contextIMessageContext實例傳入Messagehandler

context.TransactionContext.OnAborted(() => 
{ 
    Console.WriteLine("Abort"); 
}); 

context.TransactionContext.OnCommitted(async() => 
{ 
    Console.WriteLine("Commit"); 
}); 

context.TransactionContext.Abort(); 

我也試過的這個不同的變化,如獲得AmbientTransactionContext或使用Rebus.TransactionScope包,沒有效果。

回答

0

根據消息處理程序是否引發異常,Rebus被設計爲可以處理ACK/NACK。

換句話說,如果您的消息處理程序不會引發異常,則認爲消息傳遞成功,並且消息被確認。

另一方面,如果您的消息處理程序拋出異常,則消息將被NACK(並因此將其狀態重置爲RabbitMQ中的「就緒」),從而有效地使其他實例可以獲取該消息。

但是,由於RabbitMQ客戶端驅動程序的設計方式,我相信該消息實際上不會返回到服務器以供其他實例接收 - 我認爲(這只是一個猜測),事​​實驅動程序預取消息並將它們存儲在內存隊列中,直到它們被消耗,導致消息被簡單地標記爲內部重新傳送。

因此,我希望相同的服務實例執行所有的傳遞嘗試,然後 - 正如您已經正確觀察到的那樣 - Rebus將消息移動到錯誤隊列中,從而安全地存儲它以便稍後處理。

我希望是有道理的:)

0

從您的文章中不清楚的是如何中止交易。我們使用與RabbitMq類似的設置,我總是從處理程序內部拋出一個異常。

+0

我通過從傳入處理程序並調用終止處理的IMessageContext中取出TransactionContext來終止事務,如我的代碼示例所示。我也嘗試過拋出異常,但最終導致Rebus在一段時間後將消息移入錯誤隊列。如果由於某種原因,當時沒有其他相同服務的實例正在運行,就會發生這種情況。 –

0

@ mookid8000: 好吧,這使得它十分清楚,當消息ACK版或NACK版。

但是我能夠反駁你的假設。我在單獨的控制檯應用程序中綁定到相同的輸入隊列創建了兩個使用者其中一個總是拋出異常。發佈者每10秒發送一條消息。我可以看到,其他使用者在故障拋出異常之後處理消息。我明白了,因爲消息的ID是由Rebus在控制檯中記錄的。

這回答我的問題,但沒有解決我的問題。就我而言,我真正想要的是可以讓消息保留在隊列中,直到其中一個服務實例能夠處理它爲止。原因是消息的順序很重要。這可能是我的方法中的一個根本性錯誤,但現在我不想改變這一點。

是否有辦法阻止(某些)消息移動到錯誤隊列? Rebus的二級重試機制是否可以實現這一目標?

有關進一步參考下面的源代碼:

普通消費者:

class TimeEventHandler : IHandleMessages<TimeEvent> 
{ 
    public async Task Handle(TimeEvent message) 
    { 
     Console.WriteLine(message.Time); 

     //await Program.Activator.Bus.Reply("Ja danke"); 
    } 
} 

class Program 
{ 
    public static BuiltinHandlerActivator Activator; 

    static void Main(string[] args) 
    { 
     using (Activator = new BuiltinHandlerActivator()) 
     { 
      Activator.Register(() => new TimeEventHandler()); 

      var bus = Configure 
       .With(Activator) 
       .Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", 
        $"ConsumerPrototype").Prefetch(1)) 
       .Routing(r => r.TypeBased()) 
       .Start(); 

      bus.Subscribe<TimeEvent>(); 

      Console.WriteLine("Press enter to quit"); 
      Console.ReadLine(); 

      // Without the unsubscribe we have a durable subscriber, see http://www.enterpriseintegrationpatterns.com/patterns/messaging/DurableSubscription.html 
      //bus.Unsubscribe<TimeEvent>(); 
     } 
    } 
} 

故障消費者:

class FaultedTimeEventHandler : IHandleMessages<TimeEvent> 
{ 
    public async Task Handle(TimeEvent message) 
    { 
     throw new Exception("That should not have happened"); 
    } 
} 

class Program 
{ 
    public static BuiltinHandlerActivator Activator; 

    static void Main(string[] args) 
    { 
     using (Activator = new BuiltinHandlerActivator()) 
     { 
      Activator.Register(() => new FaultedTimeEventHandler()); 

      var bus = Configure 
       .With(Activator) 
       .Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", 
        $"ConsumerPrototype").Prefetch(1)) 
       .Routing(r => r.TypeBased()) 
       .Start(); 

      bus.Subscribe<TimeEvent>(); 

      Console.WriteLine("Press enter to quit"); 
      Console.ReadLine(); 
     } 
    } 
} 

出版商:

public static class PubSubTest 
{ 
    public static void Start() 
    { 
     Console.WriteLine("Starting PubSubTest"); 

     using (var activator = new BuiltinHandlerActivator()) 
     { 
      var bus = Configure 
       .With(activator) 
       .Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", "MessagingTest").Prefetch(1)) 
       .Routing(r => r.TypeBased()) 
       .Start(); 

      Observable 
       .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10)) 
       .Subscribe(_ => bus.Publish(new TimeEvent(DateTime.Now)).Wait()); 

      Console.WriteLine("Press enter to quit"); 
      Console.ReadLine(); 
     } 
    } 
} 

更新26.07.17: 我的結果與二級重試:激活它們後,我通過推遲和重新發送它們來處理它們。通過這樣做,我至少可以確保消息稍後處理,而不會丟失。

public async Task Handle(IFailed<TimeEvent> failedMessage) 
{ 
    await bus.Defer(TimeSpan.FromSeconds(30), failedMessage.Message); 
} 

這不是一個最佳的解決方案:

  1. 順序我的消息被改變:消息被延遲後,下一個消息從隊列消耗。
  2. Timeoutmanager是內存中的,我無權訪問SQL-Server。

我讀到可以在RabbitMQ中使用延遲消息。您可以使用dead-letter-exchangesplugin

將maxDeliveryAttempts增加到Int32.MaxValue非常骯髒,但按照我的要求:它保留消息,最重要的是消息在隊列中的順序。

我將此問題標記爲已解決,因爲「如何在Rebus中中止交易」這個問題已得到解答。