2012-05-15 24 views
2

我實現這個例子的基礎上,使用RX在.NET中的一個簡單的聊天: https://blogs.claritycon.com/blog/2011/04/roll-your-own-mvc-3-long-polling-chat-site/淨無擴展,如何實現簡單的聊天,不會丟失消息

還有的是,使用LongPolling的方法新郵件等待前來:

public static void CheckForMessagesAsync(Action<List<MessageInfo>> onMessages) 
{ 
    var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm => 
    { 
     var msgs = new List<MessageInfo>(); 
     var wait = new AutoResetEvent(false); 
     using (var subscriber = _messages.Subscribe(msg => 
             { 
              msgs.Add(msg); 
              wait.Set(); 
             })) 
     { 
      // Wait for the max seconds for a new msg 
      wait.WaitOne(TimeSpan.FromSeconds(MaxWaitSeconds)); 
     } 

     ((Action<List<MessageInfo>>)parm)(msgs); 
    }), onMessages); 

    if (!queued) 
     onMessages(new List<MessageInfo>()); 
} 

使用這種方法我失去斷開和處理觀察者和重新連接之間出現的消息。 如何正確實現這個機制,不會丟失這些消息?

回答

0

我找到了解決方案。我不知道它是否是世界上最美麗的,但它的工作原理。 我創建的私有財產,所以每個用戶都可以有多個會話:

private ConcurrentDictionary<long, ChatServerUserSessions> _chatServerUserSessionInfoDictionary = new ConcurrentDictionary<long, ChatServerUserSessions>(); 

和會話類:

public class ChatServerUserSessions 
{ 
    public long UserId { get; set; } 

    public string UserName { get; set; } 

    public ConcurrentDictionary<string, ChatServerUserSessionInfo> Sessions { get; set; } 

    public object Lock { get; set; } 
} 

,併爲每個會話我創建的類:

public class ChatServerUserSessionInfo : IObservable<ChatServerRoomActivityInfoBase>, IDisposable 
{ 
    public string SessionId { get; set; } 

    public List<long> SubscribedRoomIds { get; set; } 

    public DateTime SubscriptionTicketInvalidationDate { get; set; } 

    public Queue<ChatServerRoomActivityInfoBase> MessagesQueue { get; set; } 

    private IDisposable subscription; 
    private List<IObserver<ChatServerRoomActivityInfoBase>> observers; 
    private ChatServerUserSessions parentUserSessions; 

    public ChatServerUserSessionInfo(string sessionId, DateTime subscriptionTicketInvalidationDate, Subject<ChatServerRoomActivityInfoBase> chatServerRoomActivity, ChatServerUserSessions parentUserSessions) 
    { 
     this.SessionId = sessionId; 
     this.SubscribedRoomIds = new List<long>(); 
     this.SubscriptionTicketInvalidationDate = subscriptionTicketInvalidationDate; 
     this.MessagesQueue = new Queue<ChatServerRoomActivityInfoBase>(); 
     this.parentUserSessions = parentUserSessions; 

     subscription = chatServerRoomActivity.Subscribe(activity => 
     { 
      lock (parentUserSessions.Lock) 
      { 
       if (this.SubscribedRoomIds.Contains(activity.RoomId)) 
       { 
        this.MessagesQueue.Enqueue(activity); 

        foreach (var observer in observers) 
        { 
         observer.OnNext(activity); 
        } 
       } 
      } 
     }); 

     observers = new List<IObserver<ChatServerRoomActivityInfoBase>>(); 
    } 

    ~ChatServerUserSessionInfo() 
    { 
     Dispose(); 
    } 

    public void Dispose() 
    { 
     if (subscription != null) 
     { 
      subscription.Dispose(); 
      subscription = null; 
     } 

     this.observers = null; 

     GC.SuppressFinalize(this); 
    } 

    public IDisposable Subscribe(IObserver<ChatServerRoomActivityInfoBase> observer) 
    { 
     lock (parentUserSessions.Lock) 
     { 
      this.observers.Add(observer); 
      return (IDisposable)new Subscription(this, observer); 
     } 
    } 

    private void Unsubscribe(IObserver<ChatServerRoomActivityInfoBase> observer) 
    { 
     lock (parentUserSessions.Lock) 
     { 
      if (this.observers == null) 
      { 
       return; 
      } 

      this.observers.Remove(observer); 
     } 
    } 

    private class Subscription : IDisposable 
    { 
     private ChatServerUserSessionInfo subject; 
     private IObserver<ChatServerRoomActivityInfoBase> observer; 

     public Subscription(ChatServerUserSessionInfo subject, IObserver<ChatServerRoomActivityInfoBase> observer) 
     { 
      this.subject = subject; 
      this.observer = observer; 
     } 

     public void Dispose() 
     { 
      IObserver<ChatServerRoomActivityInfoBase> observer = Interlocked.Exchange<IObserver<ChatServerRoomActivityInfoBase>>(ref this.observer, (IObserver<ChatServerRoomActivityInfoBase>)null); 
      if (observer == null) 
      { 
       return; 
      } 

      this.subject.Unsubscribe(observer); 
      this.subject = (ChatServerUserSessionInfo)null; 
     } 
    } 
} 

每個用戶會話擁有自己的MessageQueue並訂閱全球聊天室活動主題。 ChatRoomActivityMessages持續保持每個單獨的會話。這裏是檢索消息的方法:

public void CheckForChatRoomsActivityAsync(long userId, string userName, Action<List<ChatServerRoomActivityInfoBase>> onChatRoomActivity) 
    { 
     var sessionId = GetCurrentSessionId(); 
     var chatServerUserSessions = GetChatServerUserSessions(userId, userName); 

     lock (chatServerUserSessions.Lock) 
     { 
      var userSession = GetChatServerUserSessionInfo(sessionId, chatServerUserSessions); 
      ProlongSubscriptions(userSession); 

      if (userSession.MessagesQueue.Count > 0) 
      { 
       var activities = new List<ChatServerRoomActivityInfoBase>(); 
       while (userSession.MessagesQueue.Count > 0) 
       { 
        activities.Add(userSession.MessagesQueue.Dequeue()); 
       } 

       onChatRoomActivity(activities); 
      } 
      else 
      { 
       var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm => 
       { 
        var activities = new List<ChatServerRoomActivityInfoBase>(); 
        var wait = new AutoResetEvent(false); 

        using (var subscriber = userSession.Subscribe(activity => 
        { 
         lock (chatServerUserSessions.Lock) 
         { 
          activities.Add(activity); 
          userSession.MessagesQueue.Dequeue(); 

          wait.Set(); 
         } 
        })) 
        { 
         wait.WaitOne(TimeSpan.FromSeconds(CheckForActivityMaxWaitSeconds)); 
        } 

        ((Action<List<ChatServerRoomActivityInfoBase>>)parm)(activities); 
       }), onChatRoomActivity); 

       if (!queued) 
       { 
        onChatRoomActivity(new List<ChatServerRoomActivityInfoBase>()); 
       } 
      } 
     } 
    }