2013-04-15 25 views

回答

3

此版本處理幾個問題:

  • 有可能導致丟失事件的競爭條件。如果觀察者在一個線程上觀察事件,而源觀察者在另一個線程上產生一個新的觀察者,如果不使用任何類型的同步,則可能最終在其他線程調用之前在一個線程上對當前觀察者調用OnCompletedOnNext在同一觀察員。這將導致事件丟失。

  • 與上面相關,默認情況下,觀察者不是線程安全的。您不應該同時打電話給觀察員,否則您將違反主要Rx合同。在沒有任何鎖定的情況下,用戶可能會在currentObserver上調用OnCompleted,而另一個線程則在該觀察者上調用OnNext。開箱即用,這種事情可以通過使用Synchronized Subject來解決。但是因爲我們還需要同步以前的問題,所以我們可以使用簡單的互斥鎖。

  • 我們需要一種方法來取消訂閱源observable。我認爲,當結果觀察者完成(或錯誤)時,由於我們的觀察者已經被告知不再需要事件,所以這是退訂源的好時機。

下面的代碼:

public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source) 
{ 
    var mutex = new object(); 
    var current = Observer.Create<T>(x => {}); 
    var subscription = source.Subscribe(o => 
    { 
     lock (mutex) 
     { 
      current.OnCompleted(); 
      current = o; 
     } 
    }); 

    return Observer.Create<T>(
     onNext: v => 
     { 
      lock(mutex) 
      {     
       current.OnNext(v); 
      } 
     }, 
     onCompleted:() => 
     { 
      subscription.Dispose(); 
      lock (mutex) 
      { 
       current.OnCompleted(); 
      } 
     }, 
     onError: e => 
     { 
      subscription.Dispose(); 
      lock (mutex) 
      { 
       current.OnError(e); 
      } 
     }); 
} 
+0

您真的覺得Interlocked.CompareExchange是必需的嗎?你保護什麼樣的競賽條件。參考副本在.net中是原子。 http://stackoverflow.com/questions/5816939/are-reference-assignment-and-reading-atomic-operations – bradgonesurfing

+0

然而,處理訂閱似乎是需要的,因爲你已經加入了。 – bradgonesurfing

+1

其實我只是意識到CompareExchange是不夠的。有兩種競爭條件:一種是觀察者可能在不同的線程上觀察事件,而不是觀察者正在產生新的觀察者。如果沒有某種保護措施,當觀察者觀察事件時,它可能會抓住舊的'currentObserver',就像它發生改變並將事件發送給錯誤的觀察者。另一個問題是它可能在你完成觀察者之後發送事件*,這意味着事件將會丟失。我會更新我的答案以使用實際鎖定。 – Brandon

1
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This) 
{ 
    IObserver<T> currentObserver = Observer.Create<T>(x => { }); 

    This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; }); 


    return Observer.Create<T> 
     (onNext: v => currentObserver.OnNext(v) 
     , onCompleted:() => currentObserver.OnCompleted() 
     , onError: v => currentObserver.OnError(v)); 
} 
+0

你輟學所有以前的觀察員。可以嗎? –

+0

當我退出前面的時候,我打電話給OnCompleted。這是打算剔除以前的觀察員。 – bradgonesurfing

+0

在你的問題聽起來像「切換離開的事件不同的觀察員」(例如)「切換離開的事件到最後一個觀察員」。它有點混亂(至少對我來說) –

相關問題