我有一個使用Masstransit/RabbitMq在Web項目中使用的企業服務總線。我使用Masstransit的Request/Response模式來完成RPC over MQ。使用MassTransit請求/響應對話擴展訂戶
我在處理不同類型消息的總線中創建多個ReceiveEndpoint。 ESB使用自定義配置文件來創建多個總線,所以我可以以某種方式在邏輯上實現Qos,甚至在單獨的服務器上通過網絡分發消費者以增加或多或少的性能。
最近我看到阻止我的一個消費者。如果我向長時間運行的消費者發送一些消息(每個消費者需要大約5秒的工作),它看起來像一個線程迴應了我的請求,同時發送的消息等待先前的消息被消耗。
到目前爲止,我設置了UseConcurrencyLimit(64)
,它什麼都沒變,試圖將PrefetchCount
增加到50,但是RabbitMq在隊列細節中顯示它爲0。
爲什麼消費者一次只能處理一條消息?
MassTransit v3.5.7
編輯:我發現this thread後。它對我來說看起來是同樣的問題。
UPDATE與Chris的示例有什麼不同,我使用RabbitMq並使用反射來創建消費者,這樣我就可以使用配置文件管理ESB。在RabbitMq管理控制檯上仍然保留Prefetch Count 0。
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.UseConcurrencyLimit(64);
IRabbitMqHost host = x.Host(new Uri(Config.host), h =>
{
h.Username("userName");
h.Password("password");
});
var obj = Activator.CreateInstance([SomeExistingType]);
x.ReceiveEndpoint(host, "queueName", e =>
{
e.PrefetchCount = 64;//config.prefetchCount;
e.PurgeOnStartup = true;
if (config.retryCount > 0)
{
e.UseRetry(retry => retry.Interval(config.retryCount, config.retryInterval));
}
e.SetQueueArgument("x-expires", config.timeout * 1000 /*Seconds to milliseconds*/);
e.SetQueueArgument("temporary", config.temporary);
e.Consumer(consumer, f => obj);
});
})
busControl.StartAsync();
UPDATE 2雖然我設置預取計數爲1讓MassTransit處理工作量,因爲我有幾個使用者服務器和一個RabbitMq羣集。然而,當我發送很多消息到隊列使所有線程忙碌時,發送方隊列中的新請求將被一個空閒線程接收。我通過這種方式增加了預取計數,最終爲新的請求提供了更多的空閒線程。我設置預取計數,同時配置接收端點爲克里斯提醒我。謝謝。一旦克里斯確認,我會將Chris的答覆標記爲答案。
在你的例子中,你正在使用一個消費者的所有消息。通常不是最好的選擇,但如果堅持下去,請確保您的客戶沒有任何實例變量,因爲它們將被所有併發消息共享。 –
您可以通過將'Activator.CreateInstance'調用移動到'e.Consumer()'調用的lambda方法來解決這個問題。 –
@ChrisPatterson非常感謝,我以前從你的回覆中讀過那位評論家。在這個例子中,我覺得我需要修改對於這個特定問題更簡單的方法。實際上,我爲每種消息類型創建了許多ReceiveEnpoints。除此之外,我還創建了幾條總線,將一些消息分組在一起,因爲其中一些使用請求/響應模式,而其他消息則用於發佈/訂閱。 –