回答

10

的Rx已經有運營商,以滿足您的需求 - 以及兩人竟 - Publish & RefCount

下面是如何使用它們:

IObservable xs = ... 

var rxs = xs.Publish().RefCount(); 

var sub1 = rxs.Subscribe(x => { }); 
var sub2 = rxs.Subscribe(x => { }); 

//later 
sub1.Dispose(); 

//later 
sub2.Dispose(); 

//The underlying subscription to `xs` is now disposed of. 

簡單。

1

如果我已經理解了你的問題,你想創建observable,這樣當所有的訂閱者都處理了它們的訂閱,即沒有更多的訂閱者,那麼你想執行一個清理功能,這將停止生產中的observable其他值。 如果這是你想要的,那麼你可以這樣做如下:

//Wrap a disposable 
public class WrapDisposable : IDisposable 
    { 
     IDisposable disp; 
     Action act; 
     public WrapDisposable(IDisposable _disp, Action _act) 
     { 
      disp = _disp; 
      act = _act; 
     } 
     void IDisposable.Dispose() 
     { 
      act(); 
      disp.Dispose(); 
     } 
    } 

    //Observable that we want to clean up after all subs are done 
    public static IObservable<long> GenerateObs(out Action cleanup) 
    { 
     cleanup =() => 
     { 
      Console.WriteLine("All subscribers are done. Do clean up"); 
     }; 
     return Observable.Interval(TimeSpan.FromSeconds(1)); 
    } 
    //Wrap the observable 
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone) 
    { 
     int count = 0; 
     return Observable.CreateWithDisposable<T>(ob => 
     { 
      var disp = obs.Subscribe(ob); 
      Interlocked.Increment(ref count); 
      return new WrapDisposable(disp,() => 
      { 
       if (Interlocked.Decrement(ref count) == 0) 
       { 
        onAllDone();             
       } 
      }); 
     }); 
    } 

//用例:

Action cleanup; 
var obs = GenerateObs(out cleanup); 
var newObs = WrapToClean(obs, cleanup); 
newObs.Take(6).Subscribe(Console.WriteLine); 
newObs.Take(5).Subscribe(Console.WriteLine); 
相關問題