我想使用Rx從TCPClient接收流中讀取數據並將數據解析爲由換行符「\ r \ n」分隔的字符串IObservable以下是我如何從套接字接收流......可觀察網絡IO解析
var messages = new Subject<string>();
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[buffer.Length];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[50];
return
from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
select copy(buffer, n);
}).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));
這是我想出來解析字符串。目前這不工作...
obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
郵件主題中塊接收消息,所以我想他們Concat的和測試連接字符串是否包含換行符,從而信號緩衝,關閉和輸出緩衝塊。不知道爲什麼它不起作用。似乎我只能從obsStrings中獲得第一個塊。
所以我正在尋找兩件事。我想簡化io流的閱讀並消除消息主題的使用。其次,我想讓我的字符串解析工作。我一直在對此進行黑客攻擊,無法提供可行的解決方案。我是Rx的初學者。
編輯:這裏是成品的問題解決了之後....
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
.SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
.Select(buffered => String.Join("", buffered))
.Select(a => a.Replace("\n", ""));
「ReceiveUntilCompleted」 是從RXX項目的延伸。
我發現我並不需要.Buffer(1);最後。 – TK3
從答案中刪除它。 – ronag