2016-01-23 81 views
4

我是Rx的新手,所以我可能在這裏犯了一些重大錯誤。Observable TcpListener在單連接後終止

我想創建一個非常簡單的套接字服務器,可以使用Observables從客戶端接收消息。爲此,我使用了Rxx,它在System.Net.Sockets命名空間中提供了擴展方法,並且還提供了ObserableTcpListener靜態工廠類。

這裏是我到目前爲止,幾乎從各種渠道竊取了它:

IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001); 
TcpListener listener = new TcpListener(endpoint); 

IObservable<TcpClient> clients = listener 
    .StartSocketObservable(1) 
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket)); 
    .Finally(listener.Stop) 

clients.Subscribe(client => 
{ 
    OnConnect(client).Subscribe(
     message => OnMessage(client, message), 
     ex => OnException(client, ex), 
     () => OnCompleted(client)); 
}); 

private static IObservable<TcpClient> SocketToTcpClient(Socket socket) 
{ 
    TcpClient client = new TcpClient(); 
    client.Client = socket; 
    return Observable.Return<TcpClient>(client); 
} 

private static IObservable<byte[]> OnConnect(TcpClient client) 
{ 
    return client.Client.ReceiveUntilCompleted(SocketFlags.None); 
} 

private static void OnMessage(TcpClient client, byte[] message) 
{ 
    Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message)); 
} 

private static void OnCompleted(TcpClient client) 
{ 
    Console.WriteLine("Completed."); 
} 

private static void OnException(TcpClient client, Exception ex) 
{ 
    Console.WriteLine("Exception: {0}", ex.ToString()); 
} 

這個工程......到一個點。我可以建立一個單一的客戶端連接。一旦該連接終止,看起來Observable序列終止並調用.Finally(listener.Stop)。顯然,那不是我想要的。

我試過使用ObserableTcpListener.Start()工廠類,但是這個結果與我相同。

IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint); 
sockets.Subscribe(client => 
{ 
    OnConnect(client).Subscribe(
     message => OnMessage(client, message), 
     ex => OnException(client, ex), 
     () => OnCompleted(client)); 
}); 

我想我瞭解這裏的問題:clients觀察到的序列僅僅是空的第一個客戶端之後,終止,從而.Finally(listener.Stop)被調用。

我需要做些什麼來規避這種情況?我怎樣才能繼續收聽傳入的連接?

+0

https://searchcode.com/codesearch/view/14317362/對我來說,代碼看起來像只接受一個連接。無論如何,建議放棄這種方法並使用標準技術來運行套接字。我看不到任何直接使用TcpListener/Client的優點,可能還有等待。 – usr

+1

@usr主要原因在於,大多數情況下,我喜歡用Rx方式編寫代碼,因爲它非常易讀,並且非常清楚地表達自己的意圖。第二個原因是我正在編寫一堆不同的事件風格的代碼,而Rx提供了對所有這些風格的很好的抽象,並且能夠保持模式相同。第三,我只是想學習Rx。感謝您的建議! –

+0

如果我可以添加一個反參數:這種風格將普通的順序代碼分割成回調,這對於代碼質量來說通常是一件很糟糕的事情。例如,你在你的代碼中有一個典型的錯誤,那就是你認爲TCP發送了「消息」。它不會,因此如果你運氣不好,「Encoding.UTF8.GetString」有時會返回垃圾,並且你的「消息」通過UTF8編碼的代碼點被中途分割。這種風格很難解決。在順序代碼中,您可以使用StreamReader或BinaryReader並提取數據。推動你必須採取什麼來。 – usr

回答

4

使您的Observable並持續有訂閱時。

IObservable<TcpClient> clients = listener 
    .StartSocketObservable(1) 
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket)) 
    .Finally(listener.Stop) 
    .Publish().RefCount(); 
+0

對不起,在發佈這個問題後我生病了一段時間,我完全忘記了我問過它。無論哪種方式,我在幾個小時前嘗試過,是的,這實際上似乎確實修復了它。 –