2014-04-02 20 views
2

對於RX我有一個小問題。我有一串符號從鍵盤進入,我需要將它們分成組。當一個';'應該開始一個新的組,符號來自流。簡單來說,我需要一個像Buffer一樣的運算符,但是當某個條件成立時會觸發,而不是經過一些時間延遲或事件計數。有沒有一種方法可以讓運營商已經出現在RX中,或者我應該自己註冊?帶反應式擴展的分區序列

回答

3

這是源代碼。

var source = new[] { 'a', 'b', ';', 'c', 'd', 'e', ';' }.ToObservable(); 

這裏是你要問什麼:

var groups = source 
    // Group the elements by some constant (0) 
    // and end the group when we see a semicolon 
    .GroupByUntil(x => 0, group => group.Where(x => x == ';')) 

下面是使用它的方式:

groups 
    // Log that we're on the next group now. 
    .Do(x => Console.WriteLine("Group: ")) 
    // Merge/Concat all the groups together 
    // {{a..b..;}..{c..d..e..;}} => {a..b..;..c..d..e..;} 
    .Merge() 
    // Ignore the semicolons? This is optional, I suppose. 
    .Where(x => x != ';') 
    // Log the characters! 
    .Do(x => Console.WriteLine(" {0}", x)) 
    // Make it so, Number One! 
    .Subscribe(); 

輸出:

Group: 
    a 
    b 
Group: 
    c 
    d 
    e 
+0

不會GroupByUntill創建一個新的序列,將在應用程序的整個生命週期中保持活躍狀態​​,還是我錯過了某些東西? –

+0

@L.E.O,編號GroupBy將在外部訂閱的生命週期中保持對每個新的GroupedObservable的引用。 GroupByUntil將爲每個對象執行相同的操作,或者直到該GroupedObservable被「until」選擇器終止。對於這個實現,只會保留一個GroupedObservable,因爲我們正在按常量進行分組。 –

+0

實際上,這與尼古拉的答案完全相同,除了它只使用一個操作員和一個訂閱。 –

2

我們可以利用緩衝區覆蓋邊界可觀察到的地方邊界可觀察是我們的初始流只過濾分號入口。

//this is our incoming stream 
IObservable<char> keyboardStream; 

//if the observable is cold we need to do this 
//in case of it being hot (as I would expect a keyboard stream to be) we do not need to do it 
var persistedStream = keyboardStream.Publish().RefCount(); 

var bufferedBySemicolon = persistedStream.Buffer(persistedStream .Where(c=>c==';')); 
+0

這看起來簡單而有趣 –

+1

我傾向於不贊同RefCount的大多數用法,除非沒有同步要求。涉及同步時,最好明確連接,特別是在處理熱點流時。 –

+0

具體來說,如果您的'keyboardStream'在不同於'bufferedBySemicolon'訂閱的線程上觀察/生成,並且因此呈現爭用條件,則此處使用RefCount是非確定性的。 –

2

這是Nikolai的答案的非RefCount版本。這提供了訂閱和處置的更加明確的同步,並且應該消除當你的消息源在你的消費者訂閱的不同線程上觀察到的競爭狀態(當你處理UI時通常是這種情況)。

var groups = Observable.Create(o => { 

    var publishedSource = source.Publish(); 

    return new CompositeDisposable(
     publishedSource.Buffer(publishedSource.Where(c => c == ';')).Subscribe(o), 
     publishedSource.Connect() 
     ); 

});