2012-05-11 31 views
1

讓我先解釋一下我想達到的目標。以Rx爲基礎的數據緩衝

可以說我有以下數據傳入形式的事件流

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 

當我訂閱的數據源,我想獲得以下結果

"ok:michael" 
"ok" 
"begin:events 1:232 2:343 end:events" 
"error:dfljsdf" 
"error:fjkdjslf" 
"ok" 

基本上,我想得到以開頭的數據爲OK或錯誤和數據開始和結束

到目前爲止,我已經試過這個..

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 



      var dataStream = Observable.Generate(
           data.GetEnumerator(), 
           e => e.MoveNext(), 
           e => e, 
           e => e.Current.ToString(), 
           e => TimeSpan.FromSeconds(0.1));   

      var onelineStream = from d in dataStream 
           where d.StartsWith("ok") || d.StartsWith("error") 
           select d; 

      // ??? 
      // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events" 
      // but it is not working... 
      var multiLineStream = from list in dataStream.Buffer<string, string, string>(
           bufferOpenings: dataStream.Where(d => d.StartsWith("begin")), 
           bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end"))) 
           select String.Join(" ", list); 

      // merge two stream???? 
      // but I have no clue how to merge these twos :(

      mergeStream .Subscribe(d => 
      { 
       Console.WriteLine(d); 
       Console.WriteLine(); 
      }); 

因爲我很新的反應式編程,我不能讓自己在被動的方式思考。 :(

在此先感謝。

回答

6

你是如此,所以非常接近正確答案!

基本上你有onelineStream & multiLineStream查詢恰到好處。

一起合併它們的非常容易,只要做到這一點:

onelineStream.Merge(multiLineStream) 

但是,如果您的疑問下降shor t在Observable.Generate中,您用於介紹值之間的延遲。這創造了一種可觀察的情況,即如果你有多個用戶,那就是「粉碎」這些值。

考慮您的數據和對dataStream外觀定義這個代碼的行爲:

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine); 
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine); 

你得到這些值:

!hello 
@Using 
!ok:michael 
@ok 
@1:232 
!begin:events 
@2:343 
!end:events 
!fdl 
@error:dfljsdf 
!error:fjkdjslf 
@ok 

注意,有些得到了由一個訂閱處理和其他人得到處理由另一個。這意味着即使您的查詢恰好正確,它們也只會看到一些數據,因此不像您期望的那樣運行。

您還可以獲取可跳過和重複值的競爭條件。所以最好避免這種可觀察的情況。

一種更好的方法來介紹值之間的延遲要做到這一點:

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100)); 

現在,這創造了一個「冷」觀察到,這意味着每一個新的用戶將獲得的觀察到這樣從開始一個新的訂閱第一個值。

您的multiLineStream查詢將無法正常工作在冷觀察。

爲了使數據流成爲一個「熱」可觀察的(其在用戶之間共享值),我們使用Publish運算符。

所以,multiLineStream現在看起來是這樣的:

var multiLineStream = 
    dataStream.Publish(ds => 
     from list in ds.Buffer(
      ds.Where(d => d.StartsWith("begin")), 
      b => ds.Where(d => d.StartsWith("end"))) 
     select String.Join(" ", list)); 

那麼你可以得到你的結果,像這樣:

onelineStream.Merge(multiLineStream).Subscribe(d => 
{ 
    Console.WriteLine(d); 
    Console.WriteLine(); 
}); 

這是我得到:

ok:michael 
ok 
begin:events 1:232 2:343 end:events 
error:dfljsdf 
error:fjkdjslf 
ok 

讓我知道這是否適合你。

+0

我希望我可以upvote 10倍。 :) –

+1

爲其他訪問者的信息,如果'dataStream'是從事件創建的(在我的情況下,'Observable.FromEvent'工廠方法),'發佈'方法不是必需的。 –

+1

這真的是一個很好的答案,以非常具有教育意義的方式全面覆蓋了原始代碼中的所有問題。充分的信貸。 – yamen