在反應擴展我們如何實現開關的IObservable <IObserver<T>>在C#中
IObservable<T> Switch(this IObservable<IObservable<T>> This)
反應擴展我想的
IObserver<T> Switch(this IObservable<IObserver<T>> This)
實現這將切換傳出事件不同的觀察者但 作爲一個單一的觀察員呈現。
在反應擴展我們如何實現開關的IObservable <IObserver<T>>在C#中
IObservable<T> Switch(this IObservable<IObservable<T>> This)
反應擴展我想的
IObserver<T> Switch(this IObservable<IObserver<T>> This)
實現這將切換傳出事件不同的觀察者但 作爲一個單一的觀察員呈現。
此版本處理幾個問題:
有可能導致丟失事件的競爭條件。如果觀察者在一個線程上觀察事件,而源觀察者在另一個線程上產生一個新的觀察者,如果不使用任何類型的同步,則可能最終在其他線程調用之前在一個線程上對當前觀察者調用OnCompleted
OnNext
在同一觀察員。這將導致事件丟失。
與上面相關,默認情況下,觀察者不是線程安全的。您不應該同時打電話給觀察員,否則您將違反主要Rx合同。在沒有任何鎖定的情況下,用戶可能會在currentObserver
上調用OnCompleted
,而另一個線程則在該觀察者上調用OnNext
。開箱即用,這種事情可以通過使用Synchronized Subject來解決。但是因爲我們還需要同步以前的問題,所以我們可以使用簡單的互斥鎖。
我們需要一種方法來取消訂閱源observable。我認爲,當結果觀察者完成(或錯誤)時,由於我們的觀察者已經被告知不再需要事件,所以這是退訂源的好時機。
下面的代碼:
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source)
{
var mutex = new object();
var current = Observer.Create<T>(x => {});
var subscription = source.Subscribe(o =>
{
lock (mutex)
{
current.OnCompleted();
current = o;
}
});
return Observer.Create<T>(
onNext: v =>
{
lock(mutex)
{
current.OnNext(v);
}
},
onCompleted:() =>
{
subscription.Dispose();
lock (mutex)
{
current.OnCompleted();
}
},
onError: e =>
{
subscription.Dispose();
lock (mutex)
{
current.OnError(e);
}
});
}
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This)
{
IObserver<T> currentObserver = Observer.Create<T>(x => { });
This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; });
return Observer.Create<T>
(onNext: v => currentObserver.OnNext(v)
, onCompleted:() => currentObserver.OnCompleted()
, onError: v => currentObserver.OnError(v));
}
你輟學所有以前的觀察員。可以嗎? –
當我退出前面的時候,我打電話給OnCompleted。這是打算剔除以前的觀察員。 – bradgonesurfing
在你的問題聽起來像「切換離開的事件不同的觀察員」(例如)「切換離開的事件到最後一個觀察員」。它有點混亂(至少對我來說) –
您真的覺得Interlocked.CompareExchange是必需的嗎?你保護什麼樣的競賽條件。參考副本在.net中是原子。 http://stackoverflow.com/questions/5816939/are-reference-assignment-and-reading-atomic-operations – bradgonesurfing
然而,處理訂閱似乎是需要的,因爲你已經加入了。 – bradgonesurfing
其實我只是意識到CompareExchange是不夠的。有兩種競爭條件:一種是觀察者可能在不同的線程上觀察事件,而不是觀察者正在產生新的觀察者。如果沒有某種保護措施,當觀察者觀察事件時,它可能會抓住舊的'currentObserver',就像它發生改變並將事件發送給錯誤的觀察者。另一個問題是它可能在你完成觀察者之後發送事件*,這意味着事件將會丟失。我會更新我的答案以使用實際鎖定。 – Brandon