2012-10-05 96 views
0

我正在使用Reactive Extensions創建匿名Observable來執行Web請求,後處理它並返回一些值。這種技術幫助我將價值獲取的機制與使用這些價值的其他類別分開。 Web請求經常執行,所以有一個計時器使用這種技術構造和調用請求。我建立Observable如下:Rx訂閱()回調問題

Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse) 

實際上,端點並不總是可用的。對於在Subscribe()中指定的Web請求超時回調,永遠不會調用方法。我正在使用Timeout() Rx擴展來處理這種情況。此外,我正在做請求結果的後處理。下面是一段我的仿真代碼,我實現來解決這個問題:

var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint"); 
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)(); 
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...) 

然後我實現處置Subscribe()方法的返回值退訂從可觀察的回調委託。其實我有一個三元組的集合<IObservable obs, IDisposable disp, bool RequestComplete>Subscribe()設置方法回調RequestComplete = true並且隨着時間的推移,如果不再需要所有一次性物品,即所有一次性物品都已被調用,即回調已被調用。一切似乎都很好,但我注意到這個系列不斷增長,我不知道爲什麼。

所以我實現了這種情況的模擬。此代碼沒有佈置部件 - 我用簡單的請求計數器取代了它:

public static class RequestCounter 
{ 
    private static object syncRoot = new object(); 

    private static int count = 0; 

    public static void Increment() 
    { 
     lock (syncRoot) 
     { 
      count++; 
     } 
    } 

    public static void Decrement() 
    { 
     lock (syncRoot) 
     { 
      count--; 
     } 
    } 

    public static int RequestCount 
    { 
     get { return count; } 
    } 
} 

而這裏的測試本身:

public void RunTest() 
    { 
     while (true) 
     { 
      var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint"); 
      var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)(); 
      RequestCounter.Increment(); 
      var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
       res => 
       { 
        RequestCounter.Decrement(); 
        Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount); 
       }, 
       ex => 
       { 
        RequestCounter.Decrement(); 
        Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount); 
       }, 
       () => 
       { 
       }); 
     } 
    } 

Web請求計數器被遞增之前。請求超時後,它會減少。但請求櫃檯增長無限。你能解釋我錯過了什麼嗎?

+0

它證明了線程池沒有自由線程來處理遞減 – EvAlex

回答

1

對可觀察序列的訂閱是異步的。 while while循環會盡可能快地繼續。增量將超過延遲的減量。

如果您使用的Rx 2.0與.NET 4.5,你可以使用伺機來運行你的循環,然後繼續下面的迭代時,先前的請求完成:

while (true) 
{ 
    var obs = ...; 

    RequestCounter.Increment(); 

    await obs.Select(...).Timeout(...); 

    RequestCounter.Decrement(); 

    Console.WriteLine(...); 
} 

更大的問題是,爲什麼如果請求已完成,您是否嘗試處理訂閱?當序列達到終端狀態(OnError,OnCompleted)時,Rx會自行清理,因此您可以簡單地將IDisposable對象放在地板上,而不用擔心這些。

+0

哦,我明白了,謝謝!它會自動取消訂閱這些代表嗎? – EvAlex

+0

是的,除非您想提前取消計算(即在序列到達終端狀態之前),否則無需擔心消費者。 –