我一直在使用ServiceStack MQ服務器/客戶端來增強我平臺中基於消息的體系結構,並且它一直在完美地工作。我現在試圖做一些我不相信是由SS消息製作者/消費者支持的東西。ServiceStack消息過濾
本質上我是在一個集中的數據中心發射消息(事件),並且在美國有遍佈美國的2000個分佈式節點,而這個節點需要潛在地瞭解這個事件,但事件需要作爲目標到〜2000個節點中的一個。我需要Pub/Sub的任意命名通道的靈活性,但需要MQ的持久性。我從Pub/Sub開始,但網絡太不可靠,所以我已經將解決方案移至使用RedisMQServer。我有它的工作,但想確保我不會錯過界面中的東西。我很好奇SS的創造者是否已經考慮過這個用例,如果有的話,那麼討論的結果是什麼?這確實打擊了使用POCO來推動消息消費的結果/行動的概念。也許這是原因?
這裏是我的製片人
public ExpressLightServiceResponse Get(ExpressLightServiceRequest query)
{
var result = new ExpressLightServiceResponse();
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
using (var messageProducer = _messageService.CreateMessageProducer())
{
var message = MessageFactory.Create(newType.CreateInstance());
messageProducer.Publish(message);
}
return result;
}
這裏是我的消費者
public class ServerAppHost : AppHostHttpListenerBase
{
private readonly string _store;
public string StoreQueue => $"EventA{_store}";
public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly)
{
_store = store;
}
public override void Configure(Container container)
{
container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString));
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
var mi = typeof(Temp).GetMethod("Foo");
var fooRef = mi.MakeGenericMethod(newType);
fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null);
}
}
public class Temp
{
private readonly IRedisClientsManager _redisClientsManager;
public Temp(IRedisClientsManager redisClientsManager)
{
_redisClientsManager = redisClientsManager;
}
public void Foo<T>()
{
var mqService = new RedisMqServer(_redisClientsManager);
mqService.RegisterHandler<T>(DoWork);
mqService.Start();
}
private object DoWork<T>(IMessage<T> arg)
{
//Do work
return null;
}
}
什麼這給了我是的Pub/Sub與隊列的耐久性的靈活性。有沒有人看到/瞭解更「本土」的方式來實現這一目標?