我有兩個使用Rebus的控制檯應用程序。它們都引用定義消息(命令和事件)的程序集。 控制檯應用程序「A」發送命令並偵聽事件以便記錄(例如:發送CreateTCommand並偵聽TCreatedEvent)。控制檯應用程序「B」(實際上是一個ASP.NET Core應用程序)偵聽命令並處理它們(例如:一個傳奇由CreateTCommand啓動,聚合被創建並引發一個TCreatedEvent)。在應用程序「B」的進程內的另一個DLL中,有一個TCreatedEvent的處理程序。Rebus:2個進程中的2個處理程序。點擊不一致和交替
所以,我必須通過應用「A」和兩個處理程序所創建的事件,一個應用程序中的「A」,一個在應用「B」發送的創建命令。
問題:當我從應用程序發送命令「A」的第一次,應用程序「B」引發創建的事件,其觸發在同一進程中的處理程序。應用程序「A」中的處理程序 未被觸發。 來自應用程序「A」的進一步命令總是由應用程序「B」中的傳奇來處理,但創建的事件不會再次在該進程中擊中處理程序,而是由應用程序「A」處理! 有時候,(我不明白如何重現)從應用程序的命令「A」不是由傳奇中的應用「B」處理(我發現在MSMQ錯誤隊列與例外的命令「與ID消息不能分派給任何處理者「)。 有時(很少)這兩個處理程序都被擊中。但我不能重現一致的行爲......
我的這個感覺(知道幾乎一無所知滷麪,這是相當新的給我):
- 可能它是一個併發問題?我的意思是:Rebus被配置爲在進程外部存儲訂閱(使用SQL或Mongo,問題不會消失),所以我認爲可能第一個處理程序太快並且將事件標記爲在第二個處理程序之前處理調用
- 檢查訂閱SQL表,我發現5行(即我已經在認購代碼(在應用程序啓動時使用bus.Subscribe())具有相同的地址每種類型的事件之一(隊列名稱鏈接。我的本地機器名),這是個問題有2個進程試圖消耗它只有一個地址
的滷麪的配置代碼是相同的2個應用程序和雲:?
const string inputQueueAddress = "myappqueue";
var mongoClient = new MongoClient("mongodb://localhost:27017");
var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence");
var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services))
.Logging(l => l.Trace())
.Routing(r => r.TypeBased()
.MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress)
.MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress)
)
.Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions"))
.Sagas(s => s.StoreInMongoDb(mongoDatabase))
.Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts"))
.Transport(t => t.UseMsmq(inputQueueAddress));
var bus = config.Start();
bus.Subscribe<AlertDefinitionCreatedEvent>();
bus.Subscribe<AlertStateAddedEvent>();
bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>();
services.AddSingleton(bus);
services.AutoRegisterHandlersFromThisAssembly();
我希望有人能幫助,這是推動我堅果...
P.S:這個問題是存在也通過isCentralized時:真實subscription.StoreInMongoDb()。
編輯1:我添加了控制檯日誌,你可以看到這種奇怪的行爲: https://postimg.org/image/czz5lchp9/
第一個命令發送成功。它由事件處理,事件觸發了控制檯應用程序「A」中的處理程序。 滷麪說,第二個命令並沒有派出任何處理程序,但它實際上是由傳奇處理(我跟着調試的代碼),而該事件是由處理器在應用「B」處理,而不是「A」 ......爲什麼? (
編輯2:我調試滷麪的源代碼,我注意到,在ThreadPoolWorker。TryAsyncReceive
async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation)
{
try
{
using (parallelOperation)
using (var context = new DefaultTransactionContext())
{
var transportMessage = await ReceiveTransportMessage(token, context);
if (transportMessage == null)
{
context.Dispose();
// no need for another thread to rush in and discover that there is no message
//parallelOperation.Dispose();
_backoffStrategy.WaitNoMessage();
return;
}
_backoffStrategy.Reset();
await ProcessMessage(context, transportMessage);
}
}
的TCreatedEvent後
由應用程序 「B」 上發表,在應用CS,方法 「A」 的代碼到達 等待ProcessMessage的(上下文,transportMessage) 其中transportMessage是實際的事件。這行代碼在應用程序「B」的進程中沒有達到。似乎消息的第一個接收者將其從MSMQ的隊列中移除。正如我所說,我對Rebus和公交車一般都很陌生,但是如果這種行爲符合我的設計,我會感到很困惑......多個進程中的多個總線如何能夠聽到相同的隊列?
哇,好吧......因爲我調試後懷疑。 正如我所說,我對這個世界很陌生,對於我被告知如何使用公共汽車,我曾設想將我們的應用程序當前執行的許多後端操作分割爲許多單獨的應用程序(Windows服務),都指向通過使用Rebus相同的隊列。 現在你告訴我這些處理程序需要在主應用程序中生存......不是悲劇,而是添加新的處理程序意味着重新編譯和重新啓動應用程序池,而不是隻打開其他服務。或者這是否意味着我沒有想到的其他事情? – Etchelon
好了解。我會嘗試這樣的: 主webapp有一個rebus singleton配置爲接收類似於「ExternalServiceStarted」和「ExternalServiceStopped」的消息。 「ExternalServiceStarted」通知主Web應用程序將某些消息類型的映射添加到新隊列中,以便新服務可以偵聽某些事件,這些事件因此將被分派到(1 + n)個隊列,其中n是數字的外部服務。停止的事件會刪除映射以避免堵塞未處理的隊列。 你覺得呢? – Etchelon