我目前正在編寫一個ProxyChecker庫。 我正在使用一個線程,讓一個Parallel.ForEach循環檢查所有代理。 我正在使用CancellationTokenSource
(cts)進行軟中止(使用cts.Cancel()
)。 正如你可以在下面的代碼中看到的,我添加了一個「測試代碼」,它將當前線程寫入控制檯。Parallel.ForEach CancellationTokenSource not Stopping
這裏是u需要的代碼:
private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
{
_cts = new CancellationTokenSource();
int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
mainThread = new Thread(() =>
{
try
{
Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
{
Interlocked.Increment(ref running);
Console.WriteLine("thread running: {0}", running);
try
{
_cts.Token.ThrowIfCancellationRequested();
if (CheckProxy(prox, domainToCheckWith, timeout))
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Increment(ref goodProxies);
Interlocked.Decrement(ref uncheckedProxyCount);
}
else
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Decrement(ref uncheckedProxyCount);
Interlocked.Increment(ref badProxies);
}
_cts.Token.ThrowIfCancellationRequested();
OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
Console.WriteLine("thread running: {0}", running);
Interlocked.Decrement(ref running);
}
});
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
isRunning = false;
OnComplete();
}
});
mainThread.Start();
}
(我拿出了幾行,因爲它也沒用作罷完整的代碼)
thread running: 1
thread running: 1
thread running: 2
thread running: 2
//Slowly going up to 50
thread running: 50
thread running: 50
thread running: 50
//Staying at 50 till I press stop
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46
//Going down...
thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4
然後停靠在4輸出或3或2(每次不同)。我等了幾分鐘,但沒有停下來,也沒有執行Parallel.ForEach之後的代碼。
該請求的超時是5000,螺紋是50
下面有用於檢查其他代碼:
private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
try
{
WebRequest req = WebRequest.Create(domainToCheckWith);
req.Proxy = new WebProxy(proxy);
req.Timeout = timeout;
var response = (HttpWebResponse) req.GetResponse();
string responseString = ReadResponseString(response);
if (responseString.Contains("SOMETHING HERE"))
{
OnGoodProxy(proxy);
return true;
}
if (responseString.Contains("SOMEOTHERTHINGHERE"))
{
OnBadProxy(proxy);
return false;
}
OnBadProxy(proxy);
return false;
}
catch (WebException ex)
{
OnBadProxy(proxy);
return false;
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
return false;
}
}
停止功能:
public void StopChecking()
{
try
{
if (_cts != null && mainThread.IsAlive)
{
if (_cts.IsCancellationRequested)
{
mainThread.Abort();
OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
while (mainThread.IsAlive) ;
OnComplete();
isRunning = false;
}
else
{
_cts.Cancel();
OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
}
}
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
}
重要編輯:
我將此添加到CeckProxy函數中:
Stopwatch sw = new Stopwatch();
sw.Start();
string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
sw.Stop();
這是最後的線程數的結果:
thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3
這是爲什麼這麼久?我的意思是180秒?!
你有沒有真正調用_cts.Cancel()。我無法在任何地方看到它。 – brumScouse
@brumScouse我沒有添加該部分,但我可以。我將在第二個 – coolerfarmer
編輯OP我會冒險猜測StopChecking中的_cts必須被限制在該方法所在的類中?我注意到_cts每次在你的CheckProxies方法中都會出現,這是故意的嗎?我的意思是,這裏的_cts必須和StopChecking中的一樣(我只是在檢查) – brumScouse