2016-11-18 14 views
1

我有兩個使用Rebus的控制檯應用程序。它們都引用定義消息(命令和事件)的程序集。 控制檯應用程序「A」發送命令並偵聽事件以便記錄(例如:發送CreateTCommand並偵聽TCreatedEvent)。控制檯應用程序「B」(實際上是一個ASP.NET Core應用程序)偵聽命令並處理它們(例如:一個傳奇由CreateTCommand啓動,聚合被創建並引發一個TCreatedEvent)。在應用程序「B」的進程內的另一個DLL中,有一個TCreatedEvent的處理程序。Rebus:2個進程中的2個處理程序。點擊不一致和交替

所以,我必須通過應用「A」和兩個處理程序所創建的事件,一個應用程序中的「A」,一個在應用「B」發送的創建命令。

問題:當我從應用程序發送命令「A」的第一次,應用程序「B」引發創建的事件,其觸發在同一進程中的處理程序。應用程序「A」中的處理程序 未被觸發。 來自應用程序「A」的進一步命令總是由應用程序「B」中的傳奇來處理,但創建的事件不會再次在該進程中擊中處理程序,而是由應用程序「A」處理! 有時候,(我不明白如何重現)從應用程序的命令「A」不是由傳奇中的應用「B」處理(我發現在MSMQ錯誤隊列與例外的命令「與ID消息不能分派給任何處理者「)。 有時(很少)這兩個處理程序都被擊中。但我不能重現一致的行爲......

我的這個感覺(知道幾乎一無所知滷麪,這是相當新的給我):

  • 可能它是一個併發問題?我的意思是:Rebus被配置爲在進程外部存儲訂閱(使用SQL或Mongo,問題不會消失),所以我認爲可能第一個處理程序太快並且將事件標記爲在第二個處理程序之前處理調用
  • 檢查訂閱SQL表,我發現5行(即我已經在認購代碼(在應用程序啓動時使用bus.Subscribe())具有相同的地址每種類型的事件之一(隊列名稱鏈接。我的本地機器名),這是個問題有2個進程試圖消耗它只有一個地址

的滷麪的配置代碼是相同的2個應用程序和雲:?

 const string inputQueueAddress = "myappqueue"; 
     var mongoClient = new MongoClient("mongodb://localhost:27017"); 
     var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence"); 
     var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services)) 
      .Logging(l => l.Trace()) 
      .Routing(r => r.TypeBased() 
       .MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress) 
       .MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress) 
      ) 
      .Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions")) 
      .Sagas(s => s.StoreInMongoDb(mongoDatabase)) 
      .Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts")) 
      .Transport(t => t.UseMsmq(inputQueueAddress)); 

     var bus = config.Start(); 
     bus.Subscribe<AlertDefinitionCreatedEvent>(); 
     bus.Subscribe<AlertStateAddedEvent>(); 
     bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>(); 
     services.AddSingleton(bus); 

     services.AutoRegisterHandlersFromThisAssembly(); 

我希望有人能幫助,這是推動我堅果...

P.S:這個問題是存在也通過isCentralized時:真實subscription.StoreInMongoDb()。

編輯1:我添加了控制檯日誌,你可以看到這種奇怪的行爲: https://postimg.org/image/czz5lchp9/

第一個命令發送成功。它由事件處理,事件觸發了控制檯應用程序「A」中的處理程序。 滷麪說,第二個命令並沒有派出任何處理程序,但它實際上是由傳奇處理(我跟着調試的代碼),而該事件是由處理器在應用「B」處理,而不是「A」 ......爲什麼? (

編輯2:我調試滷麪的源代碼,我注意到,在ThreadPoolWorker。TryAsyncReceive

async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation) 
    { 
     try 
     { 
      using (parallelOperation) 
      using (var context = new DefaultTransactionContext()) 
      { 
       var transportMessage = await ReceiveTransportMessage(token, context); 

       if (transportMessage == null) 
       { 
        context.Dispose(); 

        // no need for another thread to rush in and discover that there is no message 
        //parallelOperation.Dispose(); 

        _backoffStrategy.WaitNoMessage(); 
        return; 
       } 

       _backoffStrategy.Reset(); 

       await ProcessMessage(context, transportMessage); 
      } 
     } 
的TCreatedEvent後

由應用程序 「B」 上發表,在應用CS,方法 「A」 的代碼到達 等待ProcessMessage的(上下文,transportMessage) 其中transportMessage是實際的事件。這行代碼在應用程序「B」的進程中沒有達到。似乎消息的第一個接收者將其從MSMQ的隊列中移除。正如我所說,我對Rebus和公交車一般都很陌生,但是如果這種行爲符合我的設計,我會感到很困惑......多個進程中的多個總線如何能夠聽到相同的隊列?

回答

1

不應該有兩個總線實例接收來自同一隊列的消息,,除非它們是同一端點的多個實例。

當兩個進程使用相同的輸入隊列時,他們將相互收到消息。如果您正在實施competing consumers pattern,使用它可以在工作進程集羣之間平均分配工作,但無法在多個不同端點之間共享輸入隊列,這可能絕對沒有問題。

我的猜測是,一切看上去都更具可預測性,如果你讓每個總線實例使用它自己的輸入隊列;)

現在你告訴我這些處理程序需要住在主應用程序內

不,我不是:)我告訴你,如果你讓兩個不同的應用程序搶奪對方的消息,你將得到不可預知的結果。

雖然所有處理程序都處於同一個總線實例中(因此會被來自同一隊列的消息調用),但最常見的情況是您將以與您想要的方式匹配的方式拆分應用程序發展該系統。

這樣,您可以一次更新一個應用程序,避免大的「停止世界」 - 更新。然後你端點之間路由消息,以溝通 -

您通過啓動幾個端點,每個都使用自己的隊列做到這一點。

考慮一種您想要將命令發送到命令處理器的場景。命令處理器是一個Rebus端點,從command_processor隊列中獲取其消息。

在發送方的您將配置一個「端點映射」(你可以在the routing section on the Rebus wiki閱讀更多關於這可能是這樣的:

Configure.With(...) 
    .Transport(t => t.UseMsmq("sender")) 
    .Routing(r => { 
     r.TypeBased() 
      .Map<TheCommand>("command_processor"); 
    }) 
    .Start(); 

這將使發送者簡單地去

await bus.Send(new TheCommand(...)); 

然後總線會知道哪個隊列發送命令消息到

我希望這樣會更清楚:)

請注意,這是的point-to-point messaging一個非常簡單的例子,其中一個端點發送其目的是要通過一個單一的另一個端點被消耗的消息。 Rebus還有其他幾種模式可供您使用,例如request/replypublish/subscribe

+0

哇,好吧......因爲我調試後懷疑。 正如我所說,我對這個世界很陌生,對於我被告知如何使用公共汽車,我曾設想將我們的應用程序當前執行的許多後端操作分割爲許多單獨的應用程序(Windows服務),都指向通過使用Rebus相同的隊列。 現在你告訴我這些處理程序需要在主應用程序中生存......不是悲劇,而是添加新的處理程序意味着重新編譯和重新啓動應用程序池,而不是隻打開其他服務。或者這是否意味着我沒有想到的其他事情? – Etchelon

+0

好了解。我會嘗試這樣的: 主webapp有一個rebus singleton配置爲接收類似於「ExternalServiceStarted」和「ExternalServiceStopped」的消息。 「ExternalServiceStarted」通知主Web應用程序將某些消息類型的映射添加到新隊列中,以便新服務可以偵聽某些事件,這些事件因此將被分派到(1 + n)個隊列,其中n是數字的外部服務。停止的事件會刪除映射以避免堵塞未處理的隊列。 你覺得呢? – Etchelon

相關問題