2014-06-15 30 views
1

我目前正在編寫一個ProxyChecker庫。 我正在使用一個線程,讓一個Parallel.ForEach循環檢查所有代理。 我正在使用CancellationTokenSource(cts)進行軟中止(使用cts.Cancel())。 正如你可以在下面的代碼中看到的,我添加了一個「測試代碼」,它將當前線程寫入控制檯。Parallel.ForEach CancellationTokenSource not Stopping

這裏是u需要的代碼:

private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith) 
     { 
      _cts = new CancellationTokenSource(); 
      int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0; 
      mainThread = new Thread(() => 
      { 
       try 
       { 
        Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox => 
        { 
         Interlocked.Increment(ref running); 
         Console.WriteLine("thread running: {0}", running); 
         try 
         { 
          _cts.Token.ThrowIfCancellationRequested(); 
          if (CheckProxy(prox, domainToCheckWith, timeout)) 
          { 
           Interlocked.Increment(ref checkedProxyCount); 
           Interlocked.Increment(ref goodProxies); 
           Interlocked.Decrement(ref uncheckedProxyCount); 
          } 
          else 
          { 
           Interlocked.Increment(ref checkedProxyCount); 
           Interlocked.Decrement(ref uncheckedProxyCount); 
           Interlocked.Increment(ref badProxies); 
          } 
          _cts.Token.ThrowIfCancellationRequested(); 
          OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies); 
         } 
         catch (OperationCanceledException ex) {} 
         catch (ObjectDisposedException ex) {} 
         catch (Exception ex) 
         { 
          OnLog(ex.Message, Color.Red); 
         } 
         finally 
         { 
          Console.WriteLine("thread running: {0}", running); 
          Interlocked.Decrement(ref running); 
         } 
        }); 
       } 
       catch (OperationCanceledException ex) {} 
       catch (ObjectDisposedException ex) {} 
       catch (Exception ex) 
       { 
        OnLog(ex.Message, Color.Red); 
       } 
       finally 
       { 
        isRunning = false; 
        OnComplete(); 
       } 
      }); 
      mainThread.Start(); 
     } 

(我拿出了幾行,因爲它也沒用作罷完整的代碼)

thread running: 1 
thread running: 1 
thread running: 2 
thread running: 2 

//Slowly going up to 50 

thread running: 50 
thread running: 50 
thread running: 50 

//Staying at 50 till I press stop 

thread running: 50 
thread running: 50 
thread running: 50 
thread running: 50 
thread running: 50 
thread running: 49 
thread running: 48 
thread running: 47 
thread running: 46 

//Going down... 

thread running: 17 
thread running: 16 
thread running: 15 
thread running: 14 
thread running: 13 
thread running: 12 
thread running: 11 
thread running: 10 
thread running: 10 
thread running: 8 
thread running: 7 
thread running: 6 
thread running: 5 
thread running: 4 

然後停靠在4輸出或3或2(每次不同)。我等了幾分鐘,但沒有停下來,也沒有執行Parallel.ForEach之後的代碼。

該請求的超時是5000,螺紋是50

下面有用於檢查其他代碼:

private bool CheckProxy(string proxy, string domainToCheckWith, int timeout) 
{ 
    try 
    { 
     WebRequest req = WebRequest.Create(domainToCheckWith); 
     req.Proxy = new WebProxy(proxy); 
     req.Timeout = timeout; 
     var response = (HttpWebResponse) req.GetResponse(); 
     string responseString = ReadResponseString(response); 

     if (responseString.Contains("SOMETHING HERE")) 
     { 
      OnGoodProxy(proxy); 
      return true; 
     } 
     if (responseString.Contains("SOMEOTHERTHINGHERE")) 
     { 
      OnBadProxy(proxy); 
      return false; 
     } 
     OnBadProxy(proxy); 
     return false; 
    } 
    catch (WebException ex) 
    { 
     OnBadProxy(proxy); 
     return false; 
    } 
    catch (Exception ex) 
    { 
     OnLog(ex.Message, Color.Red); 
     return false; 
    } 
} 

停止功能:

public void StopChecking() 
{ 
    try 
    { 
     if (_cts != null && mainThread.IsAlive) 
     { 
      if (_cts.IsCancellationRequested) 
      { 
       mainThread.Abort(); 
       OnLog("Hard aborting Filter Threads...", Color.DarkGreen); 
       while (mainThread.IsAlive) ; 
       OnComplete(); 
       isRunning = false; 
      } 
      else 
      { 
       _cts.Cancel(); 
       OnLog("Soft aborting Filter Threads...", Color.DarkGreen); 
      } 
     } 
    } 
    catch (Exception ex) 
    { 
     OnLog(ex.Message, Color.Red); 
    } 
} 

重要編輯:

我將此添加到CeckProxy函數中:

 Stopwatch sw = new Stopwatch(); 
     sw.Start(); 
     string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd(); 
     sw.Stop(); 

這是最後的線程數的結果:

thread running: 6 
4449 
thread running: 5 
72534 
thread running: 4 
180094 
thread running: 3 

這是爲什麼這麼久?我的意思是180秒?!

+0

你有沒有真正調用_cts.Cancel()。我無法在任何地方看到它。 – brumScouse

+0

@brumScouse我沒有添加該部分,但我可以。我將在第二個 – coolerfarmer

+0

編輯OP我會冒險猜測StopChecking中的_cts必須被限制在該方法所在的類中?我注意到_cts每次在你的CheckProxies方法中都會出現,這是故意的嗎?我的意思是,這裏的_cts必須和StopChecking中的一樣(我只是在檢查) – brumScouse

回答

0

好吧,我理解了它自己。

我現在連續讀取響應,並用秒錶(和request.ReadWriteTimeout)檢查讀取部分在特定時間(在我的情況下爲readTimeout)後是否​​停止。代碼

HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith); 
      req.Proxy = new WebProxy(proxy); 
      req.Timeout = timeout; 
      req.ReadWriteTimeout = readTimeout; 
      req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip"); 
      req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip; 

      byte[] responseByte = new byte[1024]; 
      string responseString = string.Empty; 

      sw.Start(); 
      using (WebResponse res = req.GetResponse()) 
      { 
       using (Stream stream = res.GetResponseStream()) 
       { 
        while (stream.Read(responseByte, 0, responseByte.Length) > 0) 
        { 
         responseString += Encoding.UTF8.GetString(responseByte); 
         if(sw.ElapsedMilliseconds > (long)timeout) 
          throw new WebException(); 
        } 

       } 
      } 
      sw.Stop(); 
0

你可以試試鎖內試圖鎖定對象

Object lockObject = new Object(); 
try 
{ 
    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox => 
    { 
     Interlocked.Increment(ref running); 
     Console.WriteLine("thread running: {0}", running); 
     try 
     { 
      lock(lockObject) 
      { 
       //code............. 
      } 
     } 
     catch 
     { 
     } 
    } 
} 
catch 
{ 
} 
+2

它工作,但現在它不再是真正的多線程了,因爲它等待直到前一個線程完成/鎖定被釋放! 所以這實際上並沒有幫助我! – coolerfarmer

+0

你將不得不做這樣的事情ParallelOptions pOptions = new ParallelOptions();比在你的try catch塊或者內部,否則你將不得不調用pOptions。 CancellationToken.ThrowIfCancellationRequested();或者你也可以有...,代理,(我,loopstate)=>比內部嘗試趕上你需要loopState.Stop(); –

+0

@Davidson我也試過,但它是相同的結果 – coolerfarmer