2016-11-09 162 views
1

我一直在使用ServiceStack MQ服務器/客戶端來增強我平臺中基於消息的體系結構,並且它一直在完美地工作。我現在試圖做一些我不相信是由SS消息製作者/消費者支持的東西。ServiceStack消息過濾

本質上我是在一個集中的數據中心發射消息(事件),並且在美國有遍佈美國的2000個分佈式節點,而這個節點需要潛在地瞭解這個事件,但事件需要作爲目標到〜2000個節點中的一個。我需要Pub/Sub的任意命名通道的靈活性,但需要MQ的持久性。我從Pub/Sub開始,但網絡太不可靠,所以我已經將解決​​方案移至使用RedisMQServer。我有它的工作,但想確保我不會錯過界面中的東西。我很好奇SS的創造者是否已經考慮過這個用例,如果有的話,那麼討論的結果是什麼?這確實打擊了使用POCO來推動消息消費的結果/行動的概念。也許這是原因?

這裏是我的製片人

public ExpressLightServiceResponse Get(ExpressLightServiceRequest query) 
    { 
     var result = new ExpressLightServiceResponse(); 

     var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run); 
     var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName"); 
     var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public); 

     typeBuilder.DefineDefaultConstructor(MethodAttributes.Public); 

     var newType = typeBuilder.CreateType(); 

     using (var messageProducer = _messageService.CreateMessageProducer()) 
     { 
      var message = MessageFactory.Create(newType.CreateInstance()); 
      messageProducer.Publish(message); 
     } 

     return result; 
    } 

這裏是我的消費者

public class ServerAppHost : AppHostHttpListenerBase 
{ 
    private readonly string _store; 

    public string StoreQueue => $"EventA{_store}"; 

    public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly) 
    { 
     _store = store; 
    } 

    public override void Configure(Container container) 
    { 
     container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString)); 

     var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run); 
     var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName"); 
     var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public); 

     typeBuilder.DefineDefaultConstructor(MethodAttributes.Public); 

     var newType = typeBuilder.CreateType(); 

     var mi = typeof(Temp).GetMethod("Foo"); 
     var fooRef = mi.MakeGenericMethod(newType); 
     fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null); 
    } 
} 

public class Temp 
{ 
    private readonly IRedisClientsManager _redisClientsManager; 

    public Temp(IRedisClientsManager redisClientsManager) 
    { 
     _redisClientsManager = redisClientsManager; 
    } 

    public void Foo<T>() 
    { 
     var mqService = new RedisMqServer(_redisClientsManager); 
     mqService.RegisterHandler<T>(DoWork); 
     mqService.Start(); 
    } 

    private object DoWork<T>(IMessage<T> arg) 
    { 
     //Do work 
     return null; 
    } 
} 

什麼這給了我是的Pub/Sub與隊列的耐久性的靈活性。有沒有人看到/瞭解更「本土」的方式來實現這一目標?

回答

0

應該只有在你APPHOST註冊1個MQ主機所以我會先刪除它從你的包裝類,並有它只是註冊的處理程序,如:

public override void Configure(Container container) 
{ 
    //... 

    container.Register<IMessageService>(
     c => new RedisMqServer(c.Resolve<IRedisClientsManager>()); 
    var mqServer = container.Resolve<IMessageService>(); 

    fooRef.Invoke(new Temp(mqServer), null); 

    mqServer.Start(); 
} 

public class Temp 
{ 
    private readonly IMessageService mqServer; 
    public Temp(IMessageService mqServer) 
    { 
     this.mqServer = mqServer; 
    } 

    public void Foo<T>() => mqService.RegisterHandler<T>(DoWork); 
} 

但這種做法是不非常適合ServiceStack,它鼓勵使用代碼優先的消息來定義客戶端/服務器用來處理髮送和接收的消息的服務合約。因此,如果你想使用ServiceStack發送自定義消息,我建議或者每個消息都有一個單獨的類,否則應該有一個通用類型,如SendEvent,其中消息或事件類型是該類的一個屬性。

否則,如果你想繼續使用自定義消息不使用RedisMqServer,你可以只使用一個dedicated MQ like Rabbit MQ或者如果你喜歡使用Redis List directly - 這是數據結構,所有的Redis MQ的使用下方。