我是Rx的新手,所以我可能在這裏犯了一些重大錯誤。Observable TcpListener在單連接後終止
我想創建一個非常簡單的套接字服務器,可以使用Observables從客戶端接收消息。爲此,我使用了Rxx,它在System.Net.Sockets命名空間中提供了擴展方法,並且還提供了ObserableTcpListener靜態工廠類。
這裏是我到目前爲止,幾乎從各種渠道竊取了它:
IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
.Finally(listener.Stop)
clients.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
TcpClient client = new TcpClient();
client.Client = socket;
return Observable.Return<TcpClient>(client);
}
private static IObservable<byte[]> OnConnect(TcpClient client)
{
return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}
private static void OnMessage(TcpClient client, byte[] message)
{
Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}
private static void OnCompleted(TcpClient client)
{
Console.WriteLine("Completed.");
}
private static void OnException(TcpClient client, Exception ex)
{
Console.WriteLine("Exception: {0}", ex.ToString());
}
這個工程......到一個點。我可以建立一個單一的客戶端連接。一旦該連接終止,看起來Observable序列終止並調用.Finally(listener.Stop)
。顯然,那不是我想要的。
我試過使用ObserableTcpListener.Start()
工廠類,但是這個結果與我相同。
IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
我想我瞭解這裏的問題:clients
觀察到的序列僅僅是空的第一個客戶端之後,終止,從而.Finally(listener.Stop)
被調用。
我需要做些什麼來規避這種情況?我怎樣才能繼續收聽傳入的連接?
https://searchcode.com/codesearch/view/14317362/對我來說,代碼看起來像只接受一個連接。無論如何,建議放棄這種方法並使用標準技術來運行套接字。我看不到任何直接使用TcpListener/Client的優點,可能還有等待。 – usr
@usr主要原因在於,大多數情況下,我喜歡用Rx方式編寫代碼,因爲它非常易讀,並且非常清楚地表達自己的意圖。第二個原因是我正在編寫一堆不同的事件風格的代碼,而Rx提供了對所有這些風格的很好的抽象,並且能夠保持模式相同。第三,我只是想學習Rx。感謝您的建議! –
如果我可以添加一個反參數:這種風格將普通的順序代碼分割成回調,這對於代碼質量來說通常是一件很糟糕的事情。例如,你在你的代碼中有一個典型的錯誤,那就是你認爲TCP發送了「消息」。它不會,因此如果你運氣不好,「Encoding.UTF8.GetString」有時會返回垃圾,並且你的「消息」通過UTF8編碼的代碼點被中途分割。這種風格很難解決。在順序代碼中,您可以使用StreamReader或BinaryReader並提取數據。推動你必須採取什麼來。 – usr