2012-05-01 106 views
7

我想使用Rx從TCPClient接收流中讀取數據並將數據解析爲由換行符「\ r \ n」分隔的字符串IObservable以下是我如何從套接字接收流......可觀察網絡IO解析

var messages = new Subject<string>(); 

var functionReceiveSocketData = 
      Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int> 
      (client.Client.BeginReceive, client.Client.EndReceive); 

Func<byte[], int, byte[]> copy = (bs, n) => 
     { 
      var rs = new byte[buffer.Length]; 
      bs.CopyTo(rs, 0); 
      return rs; 
     }; 

Observable 
    .Defer(() => 
      { 
       var buffer = new byte[50]; 
       return 
        from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None) 
       select copy(buffer, n); 
      }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x))); 

這是我想出來解析字符串。目前這不工作...

obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

郵件主題中塊接收消息,所以我想他們Concat的和測試連接字符串是否包含換行符,從而信號緩衝,關閉和輸出緩衝塊。不知道爲什麼它不起作用。似乎我只能從obsStrings中獲得第一個塊。

所以我正在尋找兩件事。我想簡化io流的閱讀並消除消息主題的使用。其次,我想讓我的字符串解析工作。我一直在對此進行黑客攻擊,無法提供可行的解決方案。我是Rx的初學者。

編輯:這裏是成品的問題解決了之後....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None) 
      .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray()) 
      .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
      .Where(x => x.EndsWith("\r\n")) 
      .Select(buffered => String.Join("", buffered)) 
      .Select(a => a.Replace("\n", "")); 

「ReceiveUntilCompleted」 是從RXX項目的延伸。

回答

3
messages 
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
    .Where(x => x.EndsWith("\r\n")) 
+0

我發現我並不需要.Buffer(1);最後。 – TK3

+0

從答案中刪除它。 – ronag

1

而不是Subscribe並使用Subject,你可以嘗試只Select

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

現在假設這一切已經進入一個新的觀察到所謂的messages,你的下一個問題是,在這條線

var obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

您正在使用BufferScan,並且正在嘗試在兩者中執行相同的操作!請注意,Buffer需要關閉選擇器。

你真正想要的是:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n"))) 
         .Select(buffered => String.Join(buffered)); 

其中給出緩衝的可觀察到的關於何時關閉窗口(當它包含\ r \ n)和給選擇緩衝量來連接。這會導致您的拆分字符串的新觀察結果。

一個問題是,你仍然可以在一個塊的中間有新的線路,這會導致問題。一個簡單的想法是觀察上的字符,而不是完整的字符串塊,如:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

然後,你可以做messages.Where(c => c != '\r')跳過\r並更改緩衝區:

var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n'))) 
         .Select(buffered => String.Join("", buffered)); 
+0

檢查我的編輯。仍然收到奇怪的輸出。 – TK3

+0

想在'緩衝區'調用中想要'messages.Where(x => x =='\ n')'not'messages.Where(x => x!='\ n')'。也就是說,它會打破每一個新行的緩衝區。 – yamen

+0

仍然從這個可觀察的東西中得到脫節的塊。我想知道這是否是一個線程問題,測試是在另一個線程上完成的,並且緩衝會繼續或提前中斷以使緩衝變得不確定。 – TK3