2011-09-15 173 views
0

正確的解決方案,我實現我自己的發佈/訂閱模式實現與WCF與WSDualHttpBinding的代碼,但我已經超時一個小問題,我在後面解釋,現在讓我告訴我」在做:WCF的發佈/訂閱模式實現

public interface IEventSubscriber 
{ 
    [OperationContract] 
    void NotifyEvent(EventNotification notification); 
    [OperationContract] 
    void NotifyServiceDisconnecting(); 
} 

[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IEventSubscriber))] 
public interface IEventPublisherService 
{ 
    [OperationContract(IsOneWay = false, IsInitiating = true)] 
    void Subscribe(string culture); 
    [OperationContract(IsOneWay = false, IsInitiating = false, IsTerminating = true)] 
    void Unsubscribe(); 
} 

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 
internal class EventPublisherServiceImpl : IEventPublisherService 
{ 
    ServiceHost host; 
    public bool StartService() 
    { 
     bool ret = false; 
     try 
     { 
      Uri baseAddress = new Uri(ConfigurationManager.AppSettings[GlobalConstants.CfgKeyConfigEventPublishserServiceBaseAddress].ToString()); 
      EventHelper.AddEvent(string.Format("Event Publisher Service on: {0}", baseAddress.ToString())); 

      host = new ServiceHost(this, baseAddress); 

      // duplex session enable http binding 
      WSDualHttpBinding httpBinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None); 
      httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(10); 
      httpBinding.ReliableSession = new ReliableSession(); 
      httpBinding.ReliableSession.Ordered = true; 
      httpBinding.ReliableSession.InactivityTimeout = TimeSpan.FromMinutes(10); 
      host.AddServiceEndpoint(typeof(IEventPublisherService), httpBinding, baseAddress); 

      // Enable metadata publishing. 
      ServiceMetadataBehavior smb = new ServiceMetadataBehavior(); 
      smb.HttpGetEnabled = true; 
      smb.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; 
      host.Description.Behaviors.Add(smb); 

      // Open the ServiceHost to start listening for messages. 
      host.Open(); 
      ret = true; 
     } 
     catch (Exception e) 
     { 
      EventHelper.AddException(e.Message); 
     } 

     return ret; 
    } 
... 

} 

現在在我的實現類,我有存儲在存儲器中,當新通知到來對每個用戶進行下面的代碼的用戶的列表:

... 
    /// <summary> 
    /// List of active subscribers 
    /// </summary> 
    private static Dictionary<IEventSubscriber, string> subscribers = new Dictionary<IEventSubscriber, string>(); 

... 

我像這樣使用它:

internal void Subscribe(string culture) 
    { 
     lock (subscribers) 
     { 
      // Get callback contract as specified on the service definition 
      IEventSubscriber callback = OperationContext.Current.GetCallbackChannel<IEventSubscriber>(); 

      // Add subscriber to the list of active subscribers 
      if (!subscribers.ContainsKey(callback)) 
      { 
       subscribers.Add(callback, culture); 
      } 
     } 
    } 

... 

    private void OnNotificationEvent(NormalEvent notification) 
    { 
     lock (subscribers) 
     { 
      List<IEventSubscriber> listToRemove = new List<IEventSubscriber>(); 
      // Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body) 
      Parallel.ForEach(subscribers, kvp => 
      { 
       try 
       { 
        kvp.Key.NotifyEvent(new EventNotification(notification, kvp.Value)); 
       } 
       catch (Exception ex) 
       { 
        EventHelper.AddException(string.Format("Error notifying event notification to client: {0} - removing this one", ex.Message)); 
        listToRemove.Add(kvp.Key); 
       } 

      } //close lambda expression 
      ); //close method invocation 

      Parallel.ForEach(listToRemove, subs => 
      { 
       try 
       { 
        subs.NotifyServiceDisconnecting(); 
       } 
       catch (Exception ex) { 
        EventHelper.AddException(string.Format("Failed to notify client that is to be removed: {0}", 
         ex.Message)); 
       } 
       subscribers.Remove(subs); 
      } 
      ); 
     } 
    } 

這有什麼問題,如果超時得以實現(請注意,我設置10分鐘,ReceiveTimeout和非活動超時)的用戶是在列表中去故障狀態,以下異常是逮住OnNotificationEvent

* 無法完成操作'NotifyEvent',因爲會話通道超時等待接收消息。要增加超時,請在配置文件中的綁定上設置receiveTimeout屬性,或者直接在綁定上設置ReceiveTimeout屬性。 *

好吧,我可以增加超時值,但如果我這樣做會發生一些時間,最終。

我的問題是:我做錯了什麼努力來實現這個模式的時候?或者以任何其他方式實現這種模式是避免此問題的更好方法?或存在重新連接故障回調通道的任何方式(對於我讀什麼是不可能的,但由於我不能通知的連接中斷,讓客戶盲目的,不知道的是,通信結束的客戶端!?或者是給知識,他失去了通信與發佈!?)

當然像ping消息解決方案已經過時了:),但是沒關係,如果沒有更好的出現看起來像我已經實現類似的東西辦法...

感謝

回答

1

對於現在的解決辦法是改變超時有無限的價值:

  // duplex session enable http binding 
      WSDualHttpBinding httpBinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None); 
      httpBinding.ReceiveTimeout = TimeSpan.MaxValue; 
      httpBinding.ReliableSession = new ReliableSession(); 
      httpBinding.ReliableSession.Ordered = true; 
      httpBinding.ReliableSession.InactivityTimeout = TimeSpan.MaxValue; 
0

您正在使用Parallel.ForEach,但我不知道這是不夠的。 AFAIR Parallel.ForEach不會在單獨的線程中執行每個迭代。

我想建議,開始獨立的線程在OnNotificationEvent每個用戶,並使用鎖,以確保的foreach不會被收集來breaked修改例外:

lock (_subscribersSync) 
     foreach (var chatter in subscribers) 
     { 
       Logger.Log.DebugFormat("putting all users to {0}", subscribers.Key.Name); 
       Thread th = new Thread(PublishAllUserMessage); 
       th.Start(new MessageData() { Message = "", Subscriber = chatter.Key}; 
     } 

void PublishAllUserMessage(object messageData) 
{ 
     MessageData md = (MessageData)messageData; 
     try 
     { 
       md.Subscriber.NotifyEvent(...event parameters here ...); 
     } 
     catch (Exception ex) 
     { 
       Logger.Log.Error(string.Format("failed to publish message to '{0}'", md.Subscriber.Name), ex); 
       KickOff(md.Subscriber); 
     } 
} 
object _subscribersSync = new object(); 
void KickOff(IEventSubscriber p) 
{ 
     lock (_subscribersSync) 
     { 
       subscribers.Remove(p); 
       Logger.Log.WarnFormat("'{0}' kicked off", p.Name); 
     } 
} 
public class MessageData 
{ 
     public string Message; 
     public IEventSubscriber Subscriber; 
} 
+0

確定,但paralel.ForEach做同樣的,你使用線程,或similiar做,不創建的所有線程做了較爲不錯的工作與,如果我有2000客戶端我不想開始2000線程我是對的還是不是? – Nuno

+0

AFAIR沒有保證Paralel.ForEach執行的單獨的線程每次迭代。是的,它看起來不那麼難看,但我已經展示了工作系統的一部分代碼。您可以使用ThreadPool而不是創建Thread對象。只需嘗試在單獨的線程中調用訂閱者NotifyEvent並測試它對2k元素的工作方式。我想超時會消失。另外我想說的是,這是跨進程調用,您應該記住,2k訂戶的大量evenets會導致您的網絡被Paralel.ForEach或Threads重載。 – oleksa

+0

但我指着超時是ReceiveTimeout和InactivityTimeout不執行超時,但確定我會嘗試用線程來做到這一點 – Nuno