這是我的場景:訂閱動態消息
我們有一箇中央辦公室和一些地點(50和成長)。我們收到了當前從中心辦公室發佈的消息,機器在這些地點訂閱。目前,每個位置都將收到此類型的所有已發佈消息。這些消息具有locationnumber屬性,用於標識特定消息與哪個位置相關,並且在該位置的客戶端上,任何不具有匹配位置編號的消息都將被忽略。我想要做的是根據用戶所在的位置動態創建一個消息類,並讓它訂閱該類型的消息。然後,發佈者將檢查數據並生成「相同的」動態消息類(不確定它是否匹配,還沒有到達這個過程中)併發布該消息,以便只有實際上對此感興趣的位置消息發送給他們。這可能嗎?我是否對NServiceBus完全錯誤,是否有另一種做這種類型的過濾的方法? 我目前停留在試圖訂閱我創建的動態類型,我得到這個錯誤,當我嘗試:
No destination could be found for message type DerivedClassOne. Check the <MessageEndpointMappings> section of the configuration of this endpoint for an entry either for this specific message type or for its assembly.
這裏是我的配置文件:
<configuration>
<configSections>
<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
</configSections>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.1" />
</startup>
<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo="">
<MessageEndpointMappings>
<!-- publishers don't need to set this for their own message types -->
<add Messages="NSBDynamicSubscriptionSpike.Messages" Endpoint="basequeue" />
</MessageEndpointMappings>
</UnicastBusConfig>
<appSettings>
<add key="MessageClassName" value="DerivedClassOne"/>
</appSettings>
</configuration>
這裏是我的代碼:
namespace NSBDynamicSubscriptionSpike.Messages
{
public class BaseMessageClass : IMessage
{
public string BaseStringProp { get; set; }
public int BaseIntProp { get; set; }
}
}
namespace NSBDynamicSubscriptionSpike.Server
{
public class MessageHandler : AsA_Server,
IWantCustomInitialization,
IConfigureThisEndpoint,
IWantToRunWhenBusStartsAndStops
{
private Type myMessageType;
public IBus Bus { get; set; }
public void Handle(BaseMessageClass message)
{
BaseMessageClass m = (BaseMessageClass) message;
Console.WriteLine(string.Format("Message type: {0}", m.GetType()));
Console.WriteLine(string.Format("{0}: {1}", m.BaseIntProp, m.BaseStringProp));
}
private bool HandleMessage(object message)
{
Handle((BaseMessageClass)message);
return true;
}
public void Init()
{
NServiceBus.Configure.With()
.DefaultBuilder()
.Log4Net()
.MsmqTransport()
.UnicastBus()
.BinarySerializer()
.InMemorySubscriptionStorage()
.UseInMemoryTimeoutPersister()
.InMemoryFaultManagement()
.InMemorySagaPersister();
}
private Type GenerateDynamicType()
{
string dcName = ConfigurationManager.AppSettings["MessageClassName"];
AssemblyName aName = new AssemblyName(dcName);
AssemblyBuilder ab = AppDomain.CurrentDomain.DefineDynamicAssembly(aName, AssemblyBuilderAccess.Run);
ModuleBuilder mb = ab.DefineDynamicModule(dcName);
TypeBuilder tb = mb.DefineType(dcName,TypeAttributes.Public | TypeAttributes.Class, typeof(BaseMessageClass));
}
public void Start()
{
myMessageType = GenerateDynamicType();
//I tried this method first to get the endpoint set up for this type
MessageEndpointMappingCollection mappings = new MessageEndpointMappingCollection();
MessageEndpointMapping m;
m = new MessageEndpointMapping();
m.AssemblyName = myMessageType.AssemblyQualifiedName;
m.Messages = myMessageType.FullName;
m.Endpoint = "basequeue";
mappings.Add(m);
IComponentConfig<UnicastBusConfig> busConfig = Configure.Instance.Configurer.ConfigureComponent<UnicastBusConfig>(ComponentCallModelEnum.None);
busConfig.ConfigureProperty(u => u.MessageEndpointMappings, mappings);
//I also tried this from another SO question I found somewhat related, pick one or the other, not both (I think)
var ucb = Configure.Instance.Configurer.ConfigureComponent<NServiceBus.Unicast.UnicastBus>(ComponentCallModelEnum.Singleton);
ucb.ConfigureProperty(u => u.MessageOwners, new Dictionary<string, string>()
{
{myMessageType.AssemblyQualifiedName, "basequeue"}
});
//the line below is what throws the error
Bus.Subscribe(myMessageType, HandleMessage);
}
public void Stop()
{
Bus.Unsubscribe(myMessageType);
}
}
}
同一局域網中的所有50個位置?或者,這些消息是否需要通過NSB網關傳輸? –
他們在同一個域中,但是超過了t1,這就是爲什麼我想減少發送的消息,但是不需要網關。 – BlackICE