2015-11-10 69 views
1

我想知道是否有辦法採取可觀察的流並使用* While運算符,特別是TakeWhile,SkipWhile和BufferWhile,以便它們的訂閱者不會收到一個.OnComplete當bool'while'條件滿員時?「永不停息」不經意間,BufferWhile和SkipWhile RX.Net序列

當我開始使用.TakeWhile/SkipWhile和BufferWhile運算符時,我認爲它們不會終止/ .OnComplete(),但僅在布爾條件滿足時纔會發出(不)。

它可能會更有意義用一個例子:

我有指示,即一個實例是否忙或不和數據的可觀察到的流的布爾標誌:

private bool IsBusy { get;set; } 
private bool IgnoreChanges { get;set; } 

private IObservable<int> Producer { get;set; } 
private IDisposable ConsumerSubscription { get;set; } 

..和使用/設置在RX流(多個)類似的(簡化的)

private void SetupRx() 
{ 
    ConsumerSubscription = Producer 
     .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false 
     .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s) 
     .Subscribe(i => DoSomething(i)); 
} 

private void DoSomething(int i) 
{ 
    try 
    { 
     IsBusy = true; 
     // ... do something 
    } 
    finally 
    { 
     IsBusy = false; 
    } 
} 

的.SkipeWhile/.BufferWhile不應完整/的onComplete(..)每當IsBusy/IgnoreChanges標誌SWI從真到假再回來,但保持流活着。

這是不知何故可以用RX.Net開箱即可和/或有人知道如何做到這一點?

+0

只是爲了澄清,RX.net中沒有BufferWhile(我的錯誤) –

回答

4

要從IObservable<T>源丟棄OnCompleted消息,ConcatObservable.Never<T>()簡單:

source.TakeWhile(condition).Concat(Observable.Never<T>()) 

手動訂閱的IObservable<T>源,認購結束,只有當你手動退訂,你可以使用PublishIConnectableObservable<T>

var connectableSource = source.Publish(); 
// To subscribe to the source: 
var subscription = connectableSource.Connect(); 
... 
// To unsubscribe from the source: 
subscription.Dispose(); 

所有這些說,我認爲你正在接近這個錯誤。如果它正確完成,你將不需要上述技巧。看看你的查詢:

ConsumerSubscription = Producer 
    // Drop the producer's stream of ints whenever the IgnoreChanges flag 
    // is set to true, but forward them whenever the IgnoreChanges flag is set to false 
    .SkipWhile(_ => IgnoreChanges == true) 
    // For all streamed instances buffer them as long as we are busy 
    // handling the previous one(s) 
    .BufferWhile(_ => IsBusy == true) 
    .Subscribe(i => DoSomething(i)); 

您應該使用.Where(_ => !IgnoreChanges)代替.SkipWhile(_ => IgnoreChanges)

您應該使用.Buffer(_ => IsBusy.SkipWhile(busy => busy))BehaviorSubject<bool> IsBusy而不是.BufferWhile(_ => IsBusy)

完整的代碼應該是這樣的:

private BehaviorSubject<bool> IsBusy { get;set; } 
private bool IgnoreChanges { get;set; } 

private IObservable<int> Producer { get;set; } 
private IDisposable ConsumerSubscription { get;set; } 

private void SetupRx() 
{ 
    ConsumerSubscription = Producer 
     .Where(_ => !IgnoreChanges) 
     .Buffer(_ => IsBusy.SkipWhile(busy => busy)) 
     .Subscribe(buffer => DoSomething(buffer)); 
} 

private void DoSomething(IList<int> buffer) 
{ 
    try 
    { 
     IsBusy.OnNext(true); 
     // Do something 
    } 
    finally 
    { 
     IsBusy.OnNext(false); 
    } 
} 

下一個改進是試圖擺脫BehaviorSubject<bool> IsBusy的。主題是你想要避免的,因爲它們是你必須管理的狀態。

+0

感謝蒂莫西提供的關於簡化我的想法的提示!我注意到在RX中沒有BufferWhile,所以我弄了一點,爲我的三個用例寫了三個擴展方法:https://github.com/jbattermann/JB.Common/blob/master/JB。 Common.Reactive/Linq/ObservableExtensions.cs ...SkipWhile/TakeWhile實際上非常容易編寫,對於(我的).Buffer雖然還不滿意,但是很好。但是總的來說 - 感謝對Publish,Where等方面的回覆和說明。乾杯! –