我正在使用Reactive Extensions創建匿名Observable來執行Web請求,後處理它並返回一些值。這種技術幫助我將價值獲取的機制與使用這些價值的其他類別分開。 Web請求經常執行,所以有一個計時器使用這種技術構造和調用請求。我建立Observable如下:Rx訂閱()回調問題
Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse)
實際上,端點並不總是可用的。對於在Subscribe()
中指定的Web請求超時回調,永遠不會調用方法。我正在使用Timeout()
Rx擴展來處理這種情況。此外,我正在做請求結果的後處理。下面是一段我的仿真代碼,我實現來解決這個問題:
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...)
然後我實現處置Subscribe()
方法的返回值退訂從可觀察的回調委託。其實我有一個三元組的集合<IObservable obs, IDisposable disp, bool RequestComplete>
。 Subscribe()
設置方法回調RequestComplete = true
並且隨着時間的推移,如果不再需要所有一次性物品,即所有一次性物品都已被調用,即回調已被調用。一切似乎都很好,但我注意到這個系列不斷增長,我不知道爲什麼。
所以我實現了這種情況的模擬。此代碼沒有佈置部件 - 我用簡單的請求計數器取代了它:
public static class RequestCounter
{
private static object syncRoot = new object();
private static int count = 0;
public static void Increment()
{
lock (syncRoot)
{
count++;
}
}
public static void Decrement()
{
lock (syncRoot)
{
count--;
}
}
public static int RequestCount
{
get { return count; }
}
}
而這裏的測試本身:
public void RunTest()
{
while (true)
{
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
RequestCounter.Increment();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
res =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
ex =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
() =>
{
});
}
}
Web請求計數器被遞增之前。請求超時後,它會減少。但請求櫃檯增長無限。你能解釋我錯過了什麼嗎?
它證明了線程池沒有自由線程來處理遞減 – EvAlex