2012-09-17 24 views
4

如果您使用相同的端點名稱創建發佈者和使用者,我們遇到MassTransit正在丟失消息的情況。MassTransit丟失消息 - Rabbit MQ - 當發佈者和消費者端點名稱相同時,

請注意下面的代碼;如果我爲使用者或發佈者使用了不同的端點名稱(例如,發佈者的「rabbitmq:// localhost/mtlossPublised」),則該消息會統計發佈和使用的匹配;如果我使用相同的端點名稱(如示例中所示),那麼消耗的消息數少於發佈的消息數。

這是預期的行爲?或者我做錯了什麼,在下面的工作示例代碼。

using MassTransit; 
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 

namespace MTMessageLoss 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var consumerBus = ServiceBusFactory.New(b => 
      { 
       b.UseRabbitMq(); 
       b.UseRabbitMqRouting(); 
       b.ReceiveFrom("rabbitmq://localhost/mtloss"); 
      }); 
      var publisherBus = ServiceBusFactory.New(b => 
      { 
       b.UseRabbitMq(); 
       b.UseRabbitMqRouting(); 
       b.ReceiveFrom("rabbitmq://localhost/mtloss"); 
      }); 
      consumerBus.SubscribeConsumer(() => new MessageConsumer()); 
      for (int i = 0; i < 10; i++) 
       publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) }); 
      Console.WriteLine("Press ENTER Key to see how many you consumed"); 
      Console.ReadLine(); 
      Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count); 
      Console.ReadLine(); 
      consumerBus.Dispose(); 
      publisherBus.Dispose(); 
     } 
    } 
    public interface ISimpleMessage : CorrelatedBy<Guid> 
    { 
     string Message { get; } 
    } 
    public class SimpleMessage : ISimpleMessage 
    { 
     public Guid CorrelationId { get; set; } 
     public string Message { get; set; } 
    } 
    public class MessageConsumer : Consumes<ISimpleMessage>.All 
    { 
     public static int Count = 0; 
     public void Consume(ISimpleMessage message) 
     { 
      System.Threading.Interlocked.Increment(ref Count); 
     } 
    } 
} 

回答

4

底線微調的示例代碼,總線的每個實例都需要它自己的隊列進行讀取。即使總線只存在發佈消息。這只是MassTransit如何工作的要求。

http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - 請參閱警告。

當兩個總線實例共享相同的隊列時,我們將行爲保留爲未定義。無論如何,這不是我們支持的條件。每個總線實例都可以將元數據發送到其他總線實例,並且需要它自己的端點。這對MSMQ來說是一個更大的交易,所以也許我們可以將這個案例應用到RabbitMQ上 - 但這不是我們在這一點上花費太多思考的東西。

+0

Travis,Binary Worrier。感謝您的投入。感謝鏈接到Travis文檔的鏈接;我以前從未見過;我原以爲我已經閱讀了網站上的所有文檔。所有這些顯然沒有沉入:) – Bigtoe

+0

@Bigtoe如果您對如何通過更明確的方式使第一次錯過的任何想法有任何想法,我們很樂意聽取您的意見。無論什麼事情都可以讓人們更容易地接觸MT,這很棒。 – Travis

+0

鏈接現已停止 – Don

1

發生了什麼事是,在給予相同的接收器烏里你告訴MT加載在兩條總線平衡消費,但是你只有一個總線聽消息。

如果你得到它以跟蹤哪些消息被收到,你會看到它(幾乎)每隔一秒。

已經優化了你的示例代碼,我得到

We consumed 6 simple messages. Press Enter to terminate the applicaion. 
Received 0 
Received 3 
Received 5 
Received 6 
Received 7 
Received 8 

其他總線上啓動一個消費者,你會得到他們所有

We consumed 10 simple messages. Press Enter to terminate the applicaion. 
Received 0 
Received 1 
Received 2 
Received 3 
Received 4 
Received 5 
Received 6 
Received 7 
Received 8 
Received 9 

所以,是的,我會說這是預期行爲。

這裏有兩個用戶

using MassTransit; 
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 

namespace MTMessageLoss 
{ 
    class Program 
    { 
     internal static bool[] msgReceived = new bool[10]; 
     static void Main(string[] args) 
     { 
      var consumerBus = ServiceBusFactory.New(b => 
       { 
        b.UseRabbitMq(); 
        b.UseRabbitMqRouting(); 
        b.ReceiveFrom("rabbitmq://localhost/mtloss"); 
       }); 
      var publisherBus = ServiceBusFactory.New(b => 
       { 
        b.UseRabbitMq(); 
        b.UseRabbitMqRouting(); 
        b.ReceiveFrom("rabbitmq://localhost/mtloss"); 
       }); 
      publisherBus.SubscribeConsumer(() => new MessageConsumer()); 
      consumerBus.SubscribeConsumer(() => new MessageConsumer()); 
      for (int i = 0; i < 10; i++) 
       consumerBus.Publish(new SimpleMessage() 
        {CorrelationId = Guid.NewGuid(), MsgId = i}); 
      Console.WriteLine("Press ENTER Key to see how many you consumed"); 
      Console.ReadLine(); 
      Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", 
           MessageConsumer.Count); 
      for (int i = 0; i < 10; i++) 
       if (msgReceived[i]) 
        Console.WriteLine("Received {0}", i); 
      Console.ReadLine(); 
      consumerBus.Dispose(); 
      publisherBus.Dispose(); 

     } 
    } 
    public interface ISimpleMessage : CorrelatedBy<Guid> 
    { 
     int MsgId { get; } 
    } 
    public class SimpleMessage : ISimpleMessage 
    { 
     public Guid CorrelationId { get; set; } 
     public int MsgId { get; set; } 
    } 
    public class MessageConsumer : Consumes<ISimpleMessage>.All 
    { 
     public static int Count = 0; 
     public void Consume(ISimpleMessage message) 
     { 
      Program.msgReceived[message.MsgId] = true; 
      System.Threading.Interlocked.Increment(ref Count); 
     } 
    } 
} 
+0

好的這確實解釋了一定程度;但在出版社巴士上的原始樣本中沒有訂閱;所以它們永遠不會被出版商總線消耗掉,所以爲什麼要平衡負載。 另外我不確定你的解釋是什麼,因爲這些信息不應該丟失;他們應該一直消耗。在我們的例子中,只有一個消費者;端點配置不應該確定負載平衡;這是應該確定的註冊消費者。 – Bigtoe

+0

這不是與消費相關的總線實例,而是接收方URI(首先,MT如何爲您提供跨服務的負載平衡)。對我來說,MT完全可以看到這個配置,並且說:「我有兩個相同總線的實例(同一個Uri),並且我被告知該_type_總線消耗消息x,所以我會負載平衡他們之間的消息「。 tl; dr **如果你不想平衡兩條公共汽車,給它們不同的名稱** –

+0

如果這就是MT如何做負載平衡,那麼我猜我只是「必須吸那個檸檬」。但從出版的角度來看,它根本沒有意義。發佈者不應該給「猴子叔叔」誰,什麼,何時,何地或如何消費該消息;它應該只是發佈該死的東西。因此,我們現在使用URI將消費者和發佈者捆綁在一起;我們的意思是出版中的消費者邏輯。呸!真正的發佈URI應該沒有端點名稱,除了如何連接到排隊基礎結構。 – Bigtoe

相關問題