1
我正在嘗試使用Rx完成對Web終點的完全異步讀取。我做了工作,在一個醜陋的方式,使用這樣的事情:使用Rx異步讀取webrequest
var reqUri = new Uri(string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docID));
var wr = WebRequest.CreateHttp(reqUri);
var s = Observable
.FromAsyncPattern<WebResponse>(
wr.BeginGetResponse,
wr.EndGetResponse)
.Invoke()
.Catch(Observable.Return<WebResponse>(null))
.Select(ExtractString)
.Select(ParseToMD);
的ExtractString是打開的流,並重新讀取,阻塞方法。然而,我想要做到異步,但要做到這一點,我遇到了一些麻煩。我認爲有以下應該工作:
var s = Observable
.FromAsyncPattern<WebResponse>(
wr.BeginGetResponse,
wr.EndGetResponse)
.Invoke()
.SelectMany(resp => Observable.Using(() => resp.GetResponseStream(), strm => Observable.Return(strm)))
.SelectMany(resp => Observable.Using(() => new StreamReader(resp), strm => Observable.Return(strm)))
.SelectMany(strm => Observable.FromAsync(tkn => strm.ReadToEndAsync()))
.Select(ParseToMD);
而且,如果我把一個破發點在有一些點,有時它的工作原理。其他時候它只是掛起。所以,我有兩個問題。首先,這是做這種事的正確方法嗎?其次,調試類似這樣的最佳方法是什麼?是否有一些可以觀察的跟蹤顯示了系統正在進行的事件?現在我剛剛在lambda函數中設置了斷點。
ParseMD函數只是將字符串轉換爲一類數據。
消除第一個使用(對於GetResponseStream)似乎是它的關鍵。我想這是因爲Observable.Return返回然後終止,它會在該流上進行正確的調用Dispose。下游項目仍在處理中。我仍然需要更好地瞭解這裏的生活。 :-)感謝間諜方法! – Gordon
哦,另一件事,StartAsync我認爲需要一個強制性的參數 - 取消令牌。雖然ReadToEndAsync不能使用它。 – Gordon
我在Rx的舊版本上運行/寫了上述內容,可以安全地忽略取消標記,在異步任務支持它的情況下,它可以通過取消訂閱。 –