2017-04-26 49 views
0

我有一個使用Masstransit/RabbitMq在Web項目中使用的企業服務總線。我使用Masstransit的Request/Response模式來完成RPC over MQ。使用MassTransit請求/響應對話擴展訂戶

我在處理不同類型消息的總線中創建多個ReceiveEndpoint。 ESB使用自定義配置文件來創建多個總線,所以我可以以某種方式在邏輯上實現Qos,甚至在單獨的服務器上通過網絡分發消費者以增加或多或少的性能。

最近我看到阻止我的一個消費者。如果我向長時間運行的消費者發送一些消息(每個消費者需要大約5秒的工作),它看起來像一個線程迴應了我的請求,同時發送的消息等待先前的消息被消耗。

到目前爲止,我設置了UseConcurrencyLimit(64),它什麼都沒變,試圖將PrefetchCount增加到50,但是RabbitMq在隊列細節中顯示它爲0。

爲什麼消費者一次只能處理一條消息?

MassTransit v3.5.7

編輯:我發現this thread後。它對我來說看起來是同樣的問題。

UPDATE與Chris的示例有什麼不同,我使用RabbitMq並使用反射來創建消費者,這樣我就可以使用配置文件管理ESB。在RabbitMq管理控制檯上仍然保留Prefetch Count 0。

var busControl = Bus.Factory.CreateUsingRabbitMq(x => 
     { 
      x.UseConcurrencyLimit(64); 
      IRabbitMqHost host = x.Host(new Uri(Config.host), h => 
      { 
       h.Username("userName"); 
       h.Password("password"); 
      }); 

      var obj = Activator.CreateInstance([SomeExistingType]); 

      x.ReceiveEndpoint(host, "queueName", e => 
      { 
       e.PrefetchCount = 64;//config.prefetchCount; 
       e.PurgeOnStartup = true; 
       if (config.retryCount > 0) 
       { 
        e.UseRetry(retry => retry.Interval(config.retryCount, config.retryInterval)); 
       } 
       e.SetQueueArgument("x-expires", config.timeout * 1000 /*Seconds to milliseconds*/); 
       e.SetQueueArgument("temporary", config.temporary); 

       e.Consumer(consumer, f => obj); 

      }); 

     }) 

busControl.StartAsync();

UPDATE 2雖然我設置預取計數爲1讓MassTransit處理工作量,因爲我有幾個使用者服務器和一個RabbitMq羣集。然而,當我發送很多消息到隊列使所有線程忙碌時,發送方隊列中的新請求將被一個空閒線程接收。我通過這種方式增加了預取計數,最終爲新的請求提供了更多的空閒線程。我設置預取計數,同時配置接收端點爲克里斯提醒我。謝謝。一旦克里斯確認,我會將Chris的答覆標記爲答案。

+0

在你的例子中,你正在使用一個消費者的所有消息。通常不是最好的選擇,但如果堅持下去,請確保您的客戶沒有任何實例變量,因爲它們將被所有併發消息共享。 –

+0

您可以通過將'Activator.CreateInstance'調用移動到'e.Consumer()'調用的lambda方法來解決這個問題。 –

+0

@ChrisPatterson非常感謝,我以前從你的回覆中讀過那位評論家。在這個例子中,我覺得我需要修改對於這個特定問題更簡單的方法。實際上,我爲每種消息類型創建了許多ReceiveEnpoints。除此之外,我還創建了幾條總線,將一些消息分組在一起,因爲其中一些使用請求/響應模式,而其他消息則用於發佈/訂閱。 –

回答

1

如果您在RabbitMQ控制檯中看到0用於預取計數,則可能是將它配置在錯誤的位置。你應該沿着線的東西:

cfg.ReceiveEndpoint("my-queue", x => 
{ 
    x.PrefetchCount = 64; 
    x.Consumer<MyConsumer>(); 
}); 

這將配置接收終端消費的64預取數,應在RabbitMQ的管理控制檯上的消費者出現。

另一方面,如果您的客戶使用類似Thread.Sleep()的東西阻塞線程,我已經看到了可能會限制線程池併發的情況。

+0

謝謝你的回答。我已經更新了我的原始問題,請問我是否缺少一些東西? –

+0

另一個更新:eventhough RabbitMq顯示Prefetch計數爲0,實際上不是0.我可以說做了一些測試後。而你的答案看起來像是對我的作品,我會解釋我原來的問題中我錯了什麼。請確認... –