2016-04-20 16 views
4

我正在寫一個套接字服務器,它應該處理從每個連接到它的客戶端接收到的每條消息。處理來自事件的組合觀察值

所有消息都排列在一個可觀察對象中,所以它可以在外面訂閱和觀察。

爲了確保所有客戶端套接字消息都放在我用下面的代碼片段同樣觀察到:

private Subject<IObservable<Msg>> _msgSubject; 
public IObservable<Msg> Messages { get; private set; } 

public SocketServer() 
{ 
    // ... 
    // Initialization of server socket 
    // ... 
    _msgSubject = new Subject<IObservable<Msg>>(); 
    Messages = _msgSubject.Merge(); 
} 

private void OnNewConnection(ISocket socket) 
{ 
    var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h); 

    _msgSubject.OnNext(evtObservable); 
} 

現在,我已經檢查了內存的(DE)分配和問題即使套接字已經正確關閉,仍然有相關的可觀察信息被添加到主題中;此外,事件的撤銷登記從未被調用。

所以,這裏的問題是:有沒有辦法強制從主題中刪除「套接字可觀察」?

也許觸發socket observable的OnComplete應該做的工作,但如何?

回答

3

無論如何你都不會公開內部可觀測序列的終止(OnCompleteOnError)。 如果消費者碰巧處置訂閱,您允許註銷事件處理程序,但這是由消費者驅動的。

您是否在ISocket接口上提出了表示關閉套接字的任何事件? 如果是這樣,你可以添加一個Observable.FromEvent包裝,然後使用該可觀察序列來終止OnMessage序列。

var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnClose+= h, 
    h => socket.OnClose -= h); 

var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnMessage += h, 
    h => socket.OnMessage -= h); 

msgObservable.TakeUntil(closeObservable) 
+0

的確我是的!現在你提到它了,我不相信我沒有想到!謝謝! – Atropo