2017-10-13 161 views
0

我有2個系統,一個用於發佈消息和其他消耗它們。兩者都使用Masstransit(使用RabbitMQ),並使用ASP.Net web api 2和OWIN(以及Autofac作爲IoC容器)來實現。 一切工作正常,如果我的消費者沒有依賴關係,,但是當我注入一個依賴到我的消費者時,消費方法永遠不會執行(在初始化期間沒有錯誤拋出)。使用Autofac在Masstransit中註冊一個IConsumer <T>

這是相關出版商代碼:

//Startup.cs 
public class Startup 
{ 
    public void Configuration(IAppBuilder app) 
    { 
     HttpConfiguration config = new HttpConfiguration(); 

     IContainer container = null; 
     var builder = new ContainerBuilder(); 

     builder.Register(context => 
     { 
      var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
      { 
       IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings => 
       { 
        settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]); 
        settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]); 
       }); 
      }); 

      return busControl; 
     }) 
     .As<IBusControl>() 
     .As<IBus>() 
     .SingleInstance(); 

     // Register Web API controllers 
     builder.RegisterApiControllers(Assembly.GetExecutingAssembly()); 

     // Resolve dependencies 
     container = builder.Build(); 
     config.DependencyResolver = AutofacWebApiDependencyResolver(container); 

     WebApiConfig.Register(config); 
     SwaggerConfig.Register(config); 
     app.UseCors(CorsOptions.AllowAll); 

     // Register the Autofac middleware FIRST. 
     app.UseAutofacMiddleware(container); 
     app.UseWebApi(config); 

     // Starts MassTransit Service bus, and registers stopping of bus on app dispose 
     var bus = container.Resolve<IBusControl>(); 
     var busHandle = bus.StartAsync(); 
     var properties = new AppProperties(app.Properties); 
     if (properties.OnAppDisposing != CancellationToken.None) 
     { 
      properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30))); 
     } 
    } 
} 

// Controller 
public IHttpActionResult Post() 
{ 
    _bus.Publish<IFooMessage>(new 
    { 
     Foo = "Foo" 
    }); 

    return Ok(); 
} 

這是相關消費者代碼:

// Startup.cs 
public class Startup 
{ 
    public void Configuration(IAppBuilder app) 
    { 
     HttpConfiguration config = new HttpConfiguration(); 

     IContainer container = null; 
     var builder = new ContainerBuilder(); 

     builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest(); 
     builder.RegisterModule<BusModule>(); 
     builder.RegisterModule<ConsumersModule>(); 

     // Register Web API controllers 
     builder.RegisterApiControllers(Assembly.GetExecutingAssembly()); 

     // Resolve dependencies 
     container = builder.Build(); 
     config.DependencyResolver = new AutofacWebApiDependencyResolver(container); 

     WebApiConfig.Register(config); 
     SwaggerConfig.Register(config); 
     app.UseCors(CorsOptions.AllowAll); 

     // Register the Autofac middleware FIRST. 
     app.UseAutofacMiddleware(container); 
     app.UseWebApi(config); 

     // Starts MassTransit Service bus, and registers stopping of bus on app dispose 
     var bus = container.Resolve<IBusControl>(); 
     var busHandle = bus.StartAsync(); 
     var properties = new AppProperties(app.Properties); 
     if (properties.OnAppDisposing != CancellationToken.None) 
     { 
      properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30))); 
     } 
    } 
} 

// BusModule.cs 
public class BusModule : Module 
{ 
    protected override void Load(ContainerBuilder builder) 
    { 
     builder.Register(context => 
     { 
      var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
      { 
       IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings => 
       { 
        settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]); 
        settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]); 
       }); 
       cfg.ReceiveEndpoint(rabbitMqHost, "IP.AgilePoint.queue", ec => 
       { 
        ec.LoadFrom(context); 
       }); 
      }); 

      return busControl; 
     }) 
     .SingleInstance() 
     .As<IBusControl>() 
     .As<IBus>(); 
    } 
} 

// ConsumerModule.cs 
public class ConsumersModule : Module 
{ 
    protected override void Load(ContainerBuilder builder) 
    { 
     builder.RegisterType<FooConsumer>(); 
    } 
} 

// FooConsumer.cs 
public class FooConsumer : IConsumer<IFooMessage> 
{ 
    private IFooService _service; 

    public FooConsumer(IFooService service) 
    { 
     _service = service; 
    } 

    public Task Consume(ConsumeContext<IFooMessage> context) 
    { 
     IFooMessage @event = context.Message; 

     _service.DoStuff(@event.Foo); 

     return Task.FromResult(context.Message); 
    } 
} 

注意我FooConsumer具有相關性(構造函數)上IFooService。 我已經關注Masstransit documentation,但我無法得到這個工作。我究竟做錯了什麼?

框架版本:

  • .Net框架4.6.1
  • Autofac:3.5.2
  • Masstransit:3.5.7

更新時間:

代碼可以發現in this Github repository

回答

0

最後我發現我在做什麼錯。出於某種原因,我無法在RabbitMQ Management中看到我的隊列。當我已經能看到錯誤隊列,我注意到德以下錯誤:

RabbitMQ Error

我登記我IFooService這樣:

builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest(); 

InstancePerRequest()是造成錯誤。如果我註冊服務builder.RegisterType<FooService>().As<IFooService>()一切工作正常。我認爲這是因爲我的公交車實例是作爲單身人士(登記爲SingleIsntace())。使用web api項目託管我的公共汽車/消費者的事實讓我感到困惑。

無論如何,感謝@Chris Patterson和@Alexey Zimarev的正確實施方向(使用MT擴展來註冊用戶)。

+1

每個請求註冊只有在Autofac管理請求範圍時才適用,並且這是在WebAPI集成插件中完成的。 MassTransit不使用它。默認註冊是*不是單身*,它是每個依賴。這是正確的範圍。 MassTransit爲每條消息實例化一個消費者。 –

3

我建議查看特定於Autofac的文檔,它是通過擴展庫完全支持的容器。

http://masstransit-project.com/MassTransit/usage/containers/autofac.html

包裝: https://www.nuget.org/packages/masstransit.autofac

+0

我覺得這是我在做什麼,除非我正在做一個愚蠢的錯誤,我看不到...... 我FooConsumer構造函數永遠不會被調用 –

+0

你問MT從上下文加載,但我不能查看使用Autofac註冊用戶的任何代碼。如果他們沒有註冊,你如何期待他們解決? –