2013-10-14 31 views
2

經過對StackOverflow的大量工作和研究 - 其中大部分已經過時,因爲Reactive Extensions代碼最近發生了變化 - 我終於可以消除從此Observable方法讀取數據的所有編譯錯誤一個套接字,並且我比起初我更瞭解這個代碼。但並不完全。是否有人可以用英語將此回讀給我,並回答兩三個問題?閱讀Observable TAP模式

從這個方法中提取緩衝數據(或者如果我錯了,它應該是多少)?是否有部分不再需要?儘管我真的很喜歡從我的業務代碼中解耦,並且只用一兩種方法保留所有套接字代碼,但有沒有更好的方法來做到這一點(解耦和可讀)?

public static IObservable<int> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None) 
    { 
     Contract.Requires(byteCount > 0); 

     return Observable.Create<int>(
      observer => 
      { 
       byte[] buffer = new byte[byteCount]; 
       int remainder = byteCount; 
       bool shutdown = false; 

       return Observable.Defer<int>(() => 
         Task.Factory.FromAsync<int>(socket.BeginReceive(buffer, buffer.Length - remainder, remainder, flags, 
         (result) => 
         { 
          var read = (int)result.AsyncState; 
          remainder -= read; 

          if (read == 0) 
           shutdown = true; 
         }, 
         null), socket.EndReceive).ToObservable()) 
        .TakeWhile(_ => remainder > 0 && !shutdown) 
        .TakeLast(1) 
        .Subscribe(
         observer.OnNext, 
         ex => 
         { 
          var socketError = ex as SocketException; 

          if (socketError != null 
           && (socketError.SocketErrorCode == SocketError.Disconnecting 
            || socketError.SocketErrorCode == SocketError.Shutdown)) 
          { 
           observer.OnCompleted(); 
          } 
          else { observer.OnError(ex); } 
         }, 
         observer.OnCompleted); 
      }); 
    } 
} 

調用它仍然有編譯錯誤我不明白(。做和.BitConverter有一些無效參數)功能:

 static IObservable<string> StartClient(this IObserver<ScanInformation> observer, IPAddress ip, int port) 
    { 
     var client = Observable.Using(
      () => new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp), 
      socket => 
      from _ in socket.WhenConnected(ip, port) 
      from message in 
       (from first in socket.WhenDataReceived(4) 
       let length = BitConverter.ToInt32(first, 0) 
       from message in 
        Observable.If(
         condition:() => length > 0, 
         thenSource: from second in socket.WhenDataReceived(length) 
            select Encoding.UTF8.GetString(second, 0, length), 
         elseSource: Observable.Return<string>(null)) 
       select message) 
       .Repeat() 
       .TakeWhile(message => message != null) 
      select message); 

     return 
      client.Do(observer).TakeLast(1); 
    } 
+0

請發佈您的編譯錯誤。 – bradgonesurfing

+0

錯誤15'System.IObservable '不包含'Do'的定義和最佳擴展方法重載'System.Reactive.Linq.Observable.Do (System.IObservable ,System.Action )'有一些無效參數 – shipr

+0

...和... 錯誤13'System.BitConverter.ToInt32(byte [],int)'的最佳重載方法匹配有一些無效參數 – shipr

回答

5

兩個編譯錯誤是由於路過輸入不正確參數。

Do()上的編譯錯誤是因爲您的觀察者是IObserver<ScanInformation>,但客戶端是IObservable<string>。你是否想要將字符串轉換爲ScanInformation的實例?

您在BitConverter上的編譯接受byte[]作爲第一個參數(要轉換的字節緩衝區),但是您傳遞的是int推測在某些時候您從WhenDataReceived返回Buffer;現在你正在傳回讀取的字節數。

Rx沒有太大的改變,這種代碼會破壞。你的代碼看起來可能遭受了一些複製/粘貼錯誤 - 直到它可能比有用的更容易混淆。有一個look at this blog post合理的外觀讀取實現,使用Rx以相當直接的方式打包TPL調用。 This discussion也可能是有啓發性的。

Rxx庫中還有一個相當不錯的ObservableSocket。見here

+0

非常感謝,但是...我讀了這兩個職位和其他許多人;我包括了非編譯代碼,儘管未定,只是爲了讓dataReceived的目的更清晰地告訴讀者,因爲我不清楚它。它說/做什麼?我試了很長時間才使它返回緩衝區,但無法編譯並執行該操作。示例代碼我搜索出來,以幫助提供錯誤,由於.While - > .TakeWhile和過時的代碼。 – shipr

+0

WhenDataReceived的目的似乎是從Socket套接字異步讀取'bytecount'字節,可能需要多次讀取才能完成此操作。它返回每次讀取時讀取的字節數,如果未能讀取字節數或發生其他異常,則發送OnError。緩衝區最終被丟棄。這是一個奇怪的實現。我會堅持我在上面提到你的博客文章。爲了得到更多的幫助,我建議詢問更狹義的問題,因爲不清楚你的困惑在哪裏:Rx,TPL,Socket編程或者一般的異步編碼! –

+0

我想要方法來讀取字節總數並返回緩衝區。會發生什麼變化? – shipr