2014-03-27 140 views
1

這是我的場景:訂閱動態消息

我們有一箇中央辦公室和一些地點(50和成長)。我們收到了當前從中心辦公室發佈的消息,機器在這些地點訂閱。目前,每個位置都將收到此類型的所有已發佈消息。這些消息具有locationnumber屬性,用於標識特定消息與哪個位置相關,並且在該位置的客戶端上,任何不具有匹配位置編號的消息都將被忽略。我想要做的是根據用戶所在的位置動態創建一個消息類,並讓它訂閱該類型的消息。然後,發佈者將檢查數據並生成「相同的」動態消息類(不確定它是否匹配,還沒有到達這個過程中)併發布該消息,以便只有實際上對此感興趣的位置消息發送給他們。這可能嗎?我是否對NServiceBus完全錯誤,是否有另一種做這種類型的過濾的方法? 我目前停留在試圖訂閱我創建的動態類型,我得到這個錯誤,當我嘗試:

No destination could be found for message type DerivedClassOne. Check the <MessageEndpointMappings> section of the configuration of this endpoint for an entry either for this specific message type or for its assembly.

這裏是我的配置文件:

<configuration> 
    <configSections> 
    <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" /> 
    </configSections> 
    <startup> 
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.1" /> 
    </startup> 
    <UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo=""> 
    <MessageEndpointMappings> 
     <!-- publishers don't need to set this for their own message types --> 
     <add Messages="NSBDynamicSubscriptionSpike.Messages" Endpoint="basequeue" /> 
    </MessageEndpointMappings> 
    </UnicastBusConfig> 

    <appSettings> 
    <add key="MessageClassName" value="DerivedClassOne"/> 
    </appSettings> 
</configuration> 

這裏是我的代碼:

namespace NSBDynamicSubscriptionSpike.Messages 
{ 
    public class BaseMessageClass : IMessage 
    { 
     public string BaseStringProp { get; set; } 
     public int BaseIntProp { get; set; } 
    } 
} 

namespace NSBDynamicSubscriptionSpike.Server 
{ 
    public class MessageHandler : AsA_Server, 
     IWantCustomInitialization, 
     IConfigureThisEndpoint, 
     IWantToRunWhenBusStartsAndStops 
    { 
     private Type myMessageType; 

     public IBus Bus { get; set; } 

     public void Handle(BaseMessageClass message) 
     { 
      BaseMessageClass m = (BaseMessageClass) message; 
      Console.WriteLine(string.Format("Message type: {0}", m.GetType())); 
      Console.WriteLine(string.Format("{0}: {1}", m.BaseIntProp, m.BaseStringProp)); 
     } 

     private bool HandleMessage(object message) 
     { 
      Handle((BaseMessageClass)message); 
      return true; 
     } 


     public void Init() 
     { 
      NServiceBus.Configure.With() 
       .DefaultBuilder() 
       .Log4Net() 
       .MsmqTransport() 
       .UnicastBus() 
       .BinarySerializer() 
       .InMemorySubscriptionStorage() 
       .UseInMemoryTimeoutPersister() 
       .InMemoryFaultManagement() 
       .InMemorySagaPersister(); 
     } 


     private Type GenerateDynamicType() 
     { 
      string dcName = ConfigurationManager.AppSettings["MessageClassName"]; 
      AssemblyName aName = new AssemblyName(dcName); 
      AssemblyBuilder ab = AppDomain.CurrentDomain.DefineDynamicAssembly(aName, AssemblyBuilderAccess.Run); 
      ModuleBuilder mb = ab.DefineDynamicModule(dcName); 

      TypeBuilder tb = mb.DefineType(dcName,TypeAttributes.Public | TypeAttributes.Class, typeof(BaseMessageClass)); 

     } 

     public void Start() 
     { 
      myMessageType = GenerateDynamicType(); 

      //I tried this method first to get the endpoint set up for this type 
      MessageEndpointMappingCollection mappings = new MessageEndpointMappingCollection(); 
      MessageEndpointMapping m; 
      m = new MessageEndpointMapping(); 
      m.AssemblyName = myMessageType.AssemblyQualifiedName; 
      m.Messages = myMessageType.FullName; 
      m.Endpoint = "basequeue"; 
      mappings.Add(m); 

      IComponentConfig<UnicastBusConfig> busConfig = Configure.Instance.Configurer.ConfigureComponent<UnicastBusConfig>(ComponentCallModelEnum.None); 
      busConfig.ConfigureProperty(u => u.MessageEndpointMappings, mappings); 


      //I also tried this from another SO question I found somewhat related, pick one or the other, not both (I think) 
      var ucb = Configure.Instance.Configurer.ConfigureComponent<NServiceBus.Unicast.UnicastBus>(ComponentCallModelEnum.Singleton); 
      ucb.ConfigureProperty(u => u.MessageOwners, new Dictionary<string, string>() 
      { 
       {myMessageType.AssemblyQualifiedName, "basequeue"} 
      }); 

      //the line below is what throws the error 
      Bus.Subscribe(myMessageType, HandleMessage); 
     } 

     public void Stop() 
     { 
      Bus.Unsubscribe(myMessageType); 
     } 
    } 
} 
+0

同一局域網中的所有50個位置?或者,這些消息是否需要通過NSB網關傳輸? –

+0

他們在同一個域中,但是超過了t1,這就是爲什麼我想減少發送的消息,但是不需要網關。 – BlackICE

回答

1

我不認爲有什麼內置的東西可以幫助你。你要做的是content based routing。不管這是不是一個好主意,NSB並不是設計用於開箱即用(這是一條unicast總線)。

我建議採取另一個方向和裝飾ISubscriptionStorageStorageDrivenPublisher(實現IPublishMessages)使用此接口來獲取將消息發佈到的地址列表。

您的位置編號屬性應成爲標題,而不是類屬性。

包裝器/裝飾器可以檢查一些參數(例如,傳入的位置編號消息標題)並且細化GetSubscriberAddressesForMessage的裝飾實現返回的IEnumerable<Address>

下面一個例子裝飾RavenSubscriptionStorage

public class DecoratedSubscriptionStorage : ISubscriptionStorage 
{ 
    public RavenSubscriptionStorage Base { get; set; } //will be injected 

    void Subscribe(Address address, IEnumerable<MessageType> messageTypes) 
    { 
    Base.Subscribe(address, messageTypes); 
    } 

    void Unsubscribe(Address address, IEnumerable<MessageType> messageTypes) 
    { 
    Base.Unsubscribe(address, messageTypes); 
    } 

    IEnumerable<Address> GetSubscriberAddressesForMessage(
        IEnumerable<MessageType> messageTypes) 
    { 
    var addresses = Base.GetSubscriberAddressesForMessage(messageTypes); 

    //your logic goes here 
    return addresses; 
    } 

    public void Init() 
    { 
    Base.Init(); 
    } 
} 

然而,自己的實現此接口可以動態的那一刻,如果配置由DI框架/引導器裝飾無論實施NSB使用。

+0

看起來是一個很好的嘗試方向,謝謝 – BlackICE