2016-10-07 49 views
1

我在常規控制檯應用程序中使用了以下簡單代碼,並且會期望(某些)命令被並行使用。我認爲UseConcurrencyLimit會設置併發線程的數量。我所看到的是,RabbitMQ確實有10個未消息的消息,但是消費者將它們串行消費,每個console.writeline之間停頓一秒鐘。我必須錯過明顯的東西,但我不明白。爲什麼MassTransit只在單個線程上使用消息

public static class EventHandler 
{ 
    public static void Run() 
    { 
     var personQueueName = "RabbitMqPoc.Person"; 

     var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
     { 

      cfg.UseConcurrencyLimit(10); 

      var host = cfg.Host(new Uri("rabbitmq://localhost"), h => 
      { 
       h.Username("guest"); 
       h.Password("guest"); 
      }); 

      cfg.ReceiveEndpoint(host, personQueueName, e => 
      { 
       e.UseConcurrencyLimit(10); 
       e.PrefetchCount = 10; 
       e.Consumer<PersonConsumer>(); 
      }); 
     }); 

     var personSendEndpoint = busControl.GetSendEndpoint(new Uri($"rabbitmq://localhost/{personQueueName}")).Result; 

     busControl.Start(); 

     foreach (var x in Enumerable.Range(1, 20)) 
     { 
      personSendEndpoint.Send(new PersonUpdated() { Name = "Mina Ives", Key = Guid.NewGuid() }); 
     } 

     Console.ReadLine(); 
     busControl.Stop(); 
    } 
} 

internal class PersonConsumer : IConsumer<IPersonUpdated> 
{ 
    public async Task Consume(ConsumeContext<IPersonUpdated> context) 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine($"Updated {context.Message.Name}"); 
    } 

} 

回答

1

更改Thread.Sleep(1000);await Task.Delay(1000);解決您遇到的問題。

Thread.Sleep由於某些原因對TPL造成嚴重破壞。

相關問題