2017-09-26 144 views
2

我想要消耗一個可隨時填充的IObservable。從頂部排出IObservable

我有這樣的擴展方法:

public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
{ 
    return Observable.Defer(() => 
    { 
     BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 

     return source 
      .Zip(queue, (v, q) => v) 
      .SelectMany(v => selector(v) 
       .Do(_ => 
       { 

       },() => 
       { 
        queue.OnNext(new Unit()); 
       }) 
      ); 
    }); 
} 

我像如下使用:

_moviesToTranslateObservable = new Subject<IMovie>(); 
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250))) 
    .Subscribe(async movieToTranslate => 
     { 
     } 

一旦新項目推:

_moviesToTranslateObservable.OnNext(movieToTranslate); 

的的IObservable是消耗。

我的問題是,當我添加很多項目時,我不想消耗第一個已添加的,但最後添加的(如堆棧,而不是隊列)。

我該如何做到這一點? BehaviorSubject是否適合堆棧消耗行爲?

回答

2

我知道變量名稱說queue,但那BehaviorSubject不是一個真正的隊列,它更像是一個鎖。排隊真的發生在Zip函數中,該函數攜帶一個內部隊列。至於FIFO和LIFO之間的切換,我不確定你想要什麼標準,但這裏有一個FIFO版本Drain

public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
{ 
    return Observable.Defer(() => 
    { 
     BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 
     var stack = new Stack<TSource>(); 

     return source 
      .Do(item => stack.Push(item)) 
      .Zip(queue, (v, q) => v) 
      .Select(_ => stack.Pop()) 
      .SelectMany(v => selector(v) 
       .Do(_ => 
       { 

       },() => 
       { 
        queue.OnNext(new Unit()); 
       }) 
      ); 
    }); 
} 

當與以下運行的代碼中使用:

var s = new Subject<int>(); 
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250))); 
d.Subscribe(i => Console.WriteLine(i)); 
s.OnNext(0); 
s.OnNext(1); 
s.OnNext(2); 
s.OnNext(3); 
s.OnNext(4); 
s.OnNext(5); 

哪個正確產生0, 5, 4, 3, 2, 1

+0

正是我一直在尋找!非常感謝 – Ben