2016-11-30 53 views
1

我試圖將消息從錯誤隊列移回到它起源的隊列。 爲此,我在錯誤隊列上創建了一個使用者,然後將其發佈到必需隊列中。 當我嘗試這樣做時,消費消息的一半被髮布,但另一半被髮送到Error_Skipped隊列。MassTransit RabbitMQ將錯誤隊列上消費的消息的一半移動到Error_Skipped隊列

我已經嘗試過許多事情沒有成功,所以它可能是簡單的,我失蹤了。

這裏是我的代碼示例:

public class ClaimsMessage 
{ 
    public string Description { get; set; } 

    public DateTime Date { get; set; } 

    public bool Handled { get; set; } 
} 

public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>> 
{ 
    public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context) 
    { 
     try 
     { 
      await context.Publish<ClaimsMessage>(context.Message.Message); 

     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

public static IBusControl CreateClaimsErrorConsumerBus(string endPoint) 
{ 
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
    { 
     var host = cfg.Host(new Uri("rabbitmq://localhost/"), h => 
     { 
      h.Username("guest"); 
      h.Password("guest"); 
     }); 

     cfg.ReceiveEndpoint(host, endPoint, e => 
     { 
      e.Consumer(() => new ClaimsMessageErrorConsumer()); 
     }); 
    }); 
    return busControl; 
} 
+0

你有沒有考慮使用[鏟子?](https://www.rabbitmq.com/shovel.html) – stuartd

+0

我看過一鏟。但它有點基礎。 我想添加規則,這將允許我們只能將某些消息再次移回,稍後再移回其他消息。 –

+0

可以試試[郵件列表](https://groups.google.com/forum/#!forum/masstransit-discuss) – stuartd

回答

1

如果是從一個錯誤隊列回處理隊列移動郵件,你不應該調用Publish - 這將重新發送郵件給所有的用戶。您已經知道隊列名稱,因此將消息直接發送回隊列。你所看到的是,你已經在錯誤隊列上創建了一個消費者,它爲該消息創建了一個交換綁定。

那麼,這樣做,而不是:

sbc.ReceiveEndpoint("input_error", x => 
{ 
    // this prevents extra message bindings from being created 
    x.BindMessageExchanges = false; 

    x.Consumer<MyMover>(() => new MyMover(inputQueueAddress); 
}); 

public class MyMover : 
    IConsumer<ClaimsMessage> 
{ 
    public async Task Consume(ConsumeContext<ClaimsMessage> context) 
    { 
     try 
     { 
      var endpoint = await context.GetSendEndpoint(_inputQueueAddress); 
      await endpoint.Send<ClaimsMessage>(context.Message); 
     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

對於額外的信用,拷貝過來的原始郵件頭,因此消息的保真度被保留。

+0

謝謝你的回覆。 不幸的是,它仍然是這樣做的,一條消息放在Claim Queue上,另一條放在Claims_error_skipped隊列上。 雖然我沒有複製標題。你認爲這可能導致它? –

+0

嗨克里斯。 我已經創建了一個示例項目,可以在這裏下載: https://drive.google.com/file/d/0B0FYiKs0DMyrYTJfZTcxdVlKSDg/view?usp=sharing[link] 它包括我如何複製它的分步說明。 我完全可能錯過了一些東西,但我已經閱讀了相當廣泛的文檔。 –

+0

如果您正在從錯誤隊列中讀取消息,則應使用原始消息,而不是錯誤。實際上,您已在錯誤隊列中爲故障創建了額外的使用者綁定,除了從隊列中移出的原始消息之外,它還將獲取已發佈的錯誤。消耗T代替,並清理RMQ中的綁定,並且你全部設置好了。 –

相關問題