2015-05-18 80 views
0

我正在使用Rx從NetworkStream中讀取數據,並將結果作爲Hot Observable提供。 即使查詢效果很好,我不確定是否最適合完成基於NetworkStream的序列的條件。我有一些情況,序列完成,另一方面的TcpListener尚未完成或關閉連接。如何在Rx查詢時檢測NetworkStream何時結束

這是查詢。我會很高興得到有關的權利的情況提出了一些建議,以安全終止序列:

private IDisposable GetStreamSubscription(TcpClient client) 
{ 
    return Observable.Defer(() => { 
     var buffer = new byte[client.ReceiveBufferSize]; 

     return Observable.FromAsync<int>(() => { 
     return client.GetStream().ReadAsync (buffer, 0, buffer.Length); 
     }) 
     .SubscribeOn(NewThreadScheduler.Default) 
     .Select(x => buffer.Take(x).ToArray()); 
    }) 
    .Repeat() 
    .TakeWhile(bytes => bytes.Any()) //This is the condition to review 
    .Subscribe(bytes => { 
     //OnNext Logic 
    }, ex => { 
     //OnError logic 
    },() => { 
     //OnCompleted Logic 
    }); 
} 

只是要清楚我的問題,我需要知道檢測的最好方式,當網絡流是否從另一完成方(由於斷開連接,錯誤或其他)。現在我通過調用ReadAsync來完成它,直到沒有字節被返回,但我不知道這是否完全安全。

+0

這段代碼甚至沒有編譯。你可以請張貼編譯的代碼嗎? – Enigmativity

+0

我剛剛編輯代碼,刪除無用的部分並使其可編譯 –

+0

根據MSDN文檔:如果當前可用的字節數小於請求的數字,結果值可能小於請求的字節數,或者它可以是如果已到達流的末尾,則返回0(零)。 – whoisj

回答

0

這是做你想做的嗎?

private IDisposable GetStreamSubscription(TcpClient client) 
{ 
    return Observable 
     .Defer(() => 
     { 
      var buffer = new byte[client.ReceiveBufferSize]; 
      return Observable.Using(
       () => client.GetStream(), 
       st => Observable.While(
        () => st.DataAvailable, 
        Observable.Start(() => 
        { 
         var bytes = st.Read(buffer, 0, buffer.Length); 
         return buffer.Take(bytes).ToArray(); 
        }))); 
     }) 
     .SubscribeOn(NewThreadScheduler.Default) 
     .Subscribe(bytes => { 
      //OnNext Logic 
     }, ex => { 
      //OnError logic 
     },() => { 
      //OnCompleted Logic 
     }); 
} 

請注意,這將安全地處理流並在不再有任何可用數據時結束。