2013-12-19 55 views
1

我正在嘗試使用Rx完成對Web終點的完全異步讀取。我做了工作,在一個醜陋的方式,使用這樣的事情:使用Rx異步讀取webrequest

 var reqUri = new Uri(string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docID)); 
     var wr = WebRequest.CreateHttp(reqUri); 

     var s = Observable 
       .FromAsyncPattern<WebResponse>(
        wr.BeginGetResponse, 
        wr.EndGetResponse) 
       .Invoke() 
       .Catch(Observable.Return<WebResponse>(null)) 
       .Select(ExtractString) 
       .Select(ParseToMD); 

的ExtractString是打開的流,並重新讀取,阻塞方法。然而,我想要做到異步,但要做到這一點,我遇到了一些麻煩。我認爲有以下應該工作:

 var s = Observable 
       .FromAsyncPattern<WebResponse>(
        wr.BeginGetResponse, 
        wr.EndGetResponse) 
       .Invoke() 
       .SelectMany(resp => Observable.Using(() => resp.GetResponseStream(), strm => Observable.Return(strm))) 
       .SelectMany(resp => Observable.Using(() => new StreamReader(resp), strm => Observable.Return(strm))) 
       .SelectMany(strm => Observable.FromAsync(tkn => strm.ReadToEndAsync())) 
       .Select(ParseToMD); 

而且,如果我把一個破發點在有一些點,有時它的工作原理。其他時候它只是掛起。所以,我有兩個問題。首先,這是做這種事的正確方法嗎?其次,調試類似這樣的最佳方法是什麼?是否有一些可以觀察的跟蹤顯示了系統正在進行的事件?現在我剛剛在lambda函數中設置了斷點。

ParseMD函數只是將字符串轉換爲一類數據。

回答

0

您可以使用Spy method I present in this question來追蹤流的生命週期。

你離我們不遠 - 我認爲問題可能是你對Observable.Using有點過分熱心。 Observable.Return可以完成引起競爭條件,在流讀取器完成之前可以處理響應流。我認爲這可能會更可靠:

var reqUri = new Uri(
    string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docId)); 

var s = Observable.FromAsyncPattern<WebResponse>(
    wr.BeginGetResponse, 
    wr.EndGetResponse) 
    .Invoke() 
    .Select(response => response.GetResponseStream()) 
    .SelectMany(stream => 
     Observable.Using( () => new StreamReader(stream), 
         reader => Observable.StartAsync(reader.ReadToEndAsync))) 
    .Select(ParseToMD); 
+0

消除第一個使用(對於GetResponseStream)似乎是它的關鍵。我想這是因爲Observable.Return返回然後終止,它會在該流上進行正確的調用Dispose。下游項目仍在處理中。我仍然需要更好地瞭解這裏的生活。 :-)感謝間諜方法! – Gordon

+0

哦,另一件事,StartAsync我認爲需要一個強制性的參數 - 取消令牌。雖然ReadToEndAsync不能使用它。 – Gordon

+0

我在Rx的舊版本上運行/寫了上述內容,可以安全地忽略取消標記,在異步任務支持它的情況下,它可以通過取消訂閱。 –