讓我先解釋一下我想達到的目標。以Rx爲基礎的數據緩衝
可以說我有以下數據傳入形式的事件流
var data = new string[] {
"hello",
"Using",
"ok:michael",
"ok",
"begin:events",
"1:232",
"2:343",
"end:events",
"error:dfljsdf",
"fdl",
"error:fjkdjslf",
"ok"
};
當我訂閱的數據源,我想獲得以下結果
"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"
基本上,我想得到以開頭的數據爲OK或錯誤和數據開始和結束。
到目前爲止,我已經試過這個..
var data = new string[] {
"hello",
"Using",
"ok:michael",
"ok",
"begin:events",
"1:232",
"2:343",
"end:events",
"error:dfljsdf",
"fdl",
"error:fjkdjslf",
"ok"
};
var dataStream = Observable.Generate(
data.GetEnumerator(),
e => e.MoveNext(),
e => e,
e => e.Current.ToString(),
e => TimeSpan.FromSeconds(0.1));
var onelineStream = from d in dataStream
where d.StartsWith("ok") || d.StartsWith("error")
select d;
// ???
// may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events"
// but it is not working...
var multiLineStream = from list in dataStream.Buffer<string, string, string>(
bufferOpenings: dataStream.Where(d => d.StartsWith("begin")),
bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end")))
select String.Join(" ", list);
// merge two stream????
// but I have no clue how to merge these twos :(
mergeStream .Subscribe(d =>
{
Console.WriteLine(d);
Console.WriteLine();
});
因爲我很新的反應式編程,我不能讓自己在被動的方式思考。 :(
在此先感謝。
我希望我可以upvote 10倍。 :) –
爲其他訪問者的信息,如果'dataStream'是從事件創建的(在我的情況下,'Observable.FromEvent'工廠方法),'發佈'方法不是必需的。 –
這真的是一個很好的答案,以非常具有教育意義的方式全面覆蓋了原始代碼中的所有問題。充分的信貸。 – yamen