正確的解決方案,我實現我自己的發佈/訂閱模式實現與WCF與WSDualHttpBinding的代碼,但我已經超時一個小問題,我在後面解釋,現在讓我告訴我」在做:WCF的發佈/訂閱模式實現
public interface IEventSubscriber
{
[OperationContract]
void NotifyEvent(EventNotification notification);
[OperationContract]
void NotifyServiceDisconnecting();
}
[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IEventSubscriber))]
public interface IEventPublisherService
{
[OperationContract(IsOneWay = false, IsInitiating = true)]
void Subscribe(string culture);
[OperationContract(IsOneWay = false, IsInitiating = false, IsTerminating = true)]
void Unsubscribe();
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
internal class EventPublisherServiceImpl : IEventPublisherService
{
ServiceHost host;
public bool StartService()
{
bool ret = false;
try
{
Uri baseAddress = new Uri(ConfigurationManager.AppSettings[GlobalConstants.CfgKeyConfigEventPublishserServiceBaseAddress].ToString());
EventHelper.AddEvent(string.Format("Event Publisher Service on: {0}", baseAddress.ToString()));
host = new ServiceHost(this, baseAddress);
// duplex session enable http binding
WSDualHttpBinding httpBinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None);
httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(10);
httpBinding.ReliableSession = new ReliableSession();
httpBinding.ReliableSession.Ordered = true;
httpBinding.ReliableSession.InactivityTimeout = TimeSpan.FromMinutes(10);
host.AddServiceEndpoint(typeof(IEventPublisherService), httpBinding, baseAddress);
// Enable metadata publishing.
ServiceMetadataBehavior smb = new ServiceMetadataBehavior();
smb.HttpGetEnabled = true;
smb.MetadataExporter.PolicyVersion = PolicyVersion.Policy15;
host.Description.Behaviors.Add(smb);
// Open the ServiceHost to start listening for messages.
host.Open();
ret = true;
}
catch (Exception e)
{
EventHelper.AddException(e.Message);
}
return ret;
}
...
}
現在在我的實現類,我有存儲在存儲器中,當新通知到來對每個用戶進行下面的代碼的用戶的列表:
...
/// <summary>
/// List of active subscribers
/// </summary>
private static Dictionary<IEventSubscriber, string> subscribers = new Dictionary<IEventSubscriber, string>();
...
我像這樣使用它:
internal void Subscribe(string culture)
{
lock (subscribers)
{
// Get callback contract as specified on the service definition
IEventSubscriber callback = OperationContext.Current.GetCallbackChannel<IEventSubscriber>();
// Add subscriber to the list of active subscribers
if (!subscribers.ContainsKey(callback))
{
subscribers.Add(callback, culture);
}
}
}
...
private void OnNotificationEvent(NormalEvent notification)
{
lock (subscribers)
{
List<IEventSubscriber> listToRemove = new List<IEventSubscriber>();
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(subscribers, kvp =>
{
try
{
kvp.Key.NotifyEvent(new EventNotification(notification, kvp.Value));
}
catch (Exception ex)
{
EventHelper.AddException(string.Format("Error notifying event notification to client: {0} - removing this one", ex.Message));
listToRemove.Add(kvp.Key);
}
} //close lambda expression
); //close method invocation
Parallel.ForEach(listToRemove, subs =>
{
try
{
subs.NotifyServiceDisconnecting();
}
catch (Exception ex) {
EventHelper.AddException(string.Format("Failed to notify client that is to be removed: {0}",
ex.Message));
}
subscribers.Remove(subs);
}
);
}
}
這有什麼問題,如果超時得以實現(請注意,我設置10分鐘,ReceiveTimeout和非活動超時)的用戶是在列表中去故障狀態,以下異常是逮住OnNotificationEvent
* 無法完成操作'NotifyEvent',因爲會話通道超時等待接收消息。要增加超時,請在配置文件中的綁定上設置receiveTimeout屬性,或者直接在綁定上設置ReceiveTimeout屬性。 *
好吧,我可以增加超時值,但如果我這樣做會發生一些時間,最終。
我的問題是:我做錯了什麼努力來實現這個模式的時候?或者以任何其他方式實現這種模式是避免此問題的更好方法?或存在重新連接故障回調通道的任何方式(對於我讀什麼是不可能的,但由於我不能通知的連接中斷,讓客戶盲目的,不知道的是,通信結束的客戶端!?或者是給知識,他失去了通信與發佈!?)
當然像ping消息解決方案已經過時了:),但是沒關係,如果沒有更好的出現看起來像我已經實現類似的東西辦法...
感謝
確定,但paralel.ForEach做同樣的,你使用線程,或similiar做,不創建的所有線程做了較爲不錯的工作與,如果我有2000客戶端我不想開始2000線程我是對的還是不是? – Nuno
AFAIR沒有保證Paralel.ForEach執行的單獨的線程每次迭代。是的,它看起來不那麼難看,但我已經展示了工作系統的一部分代碼。您可以使用ThreadPool而不是創建Thread對象。只需嘗試在單獨的線程中調用訂閱者NotifyEvent並測試它對2k元素的工作方式。我想超時會消失。另外我想說的是,這是跨進程調用,您應該記住,2k訂戶的大量evenets會導致您的網絡被Paralel.ForEach或Threads重載。 – oleksa
但我指着超時是ReceiveTimeout和InactivityTimeout不執行超時,但確定我會嘗試用線程來做到這一點 – Nuno