2
我想要消耗一個可隨時填充的IObservable。從頂部排出IObservable
我有這樣的擴展方法:
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ =>
{
},() =>
{
queue.OnNext(new Unit());
})
);
});
}
我像如下使用:
_moviesToTranslateObservable = new Subject<IMovie>();
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250)))
.Subscribe(async movieToTranslate =>
{
}
一旦新項目推:
_moviesToTranslateObservable.OnNext(movieToTranslate);
的的IObservable是消耗。
我的問題是,當我添加很多項目時,我不想消耗第一個已添加的,但最後添加的(如堆棧,而不是隊列)。
我該如何做到這一點? BehaviorSubject是否適合堆棧消耗行爲?
正是我一直在尋找!非常感謝 – Ben