2013-07-12 60 views
0

以下是我使用的代碼。主線程等待線程池線程執行。我使用AutoResetEvent(WaitHandle),但我真的很驚訝,因爲代碼不符合預期。使用AutoResetEvent同步.NET線程池

我有兩個同心for循環,其中Threadpool在內部循環中,並且期望對於外部循環的每次迭代,都應該處理所有內部循環值。主線程使用內部循環外部的AutoResetEvent WaitOne調用來等待,這是一個靜態變量,它在外循環的每次迭代時被重置爲內循環的最大值,並且在方法調用時使用Interlock遞減。調用AutoResetEvent的Set。但是,即使我希望靜態變量在每個內部循環後顯示0值,它也不會。我的代碼中存在什麼問題以及我有什麼更好的選擇來完成此任務?事實上,由於這些值的混合,主線程似乎並沒有等待線程池線程。

using System; 
using System.Threading; 

namespace TestThreads 
{ 
class Program 
{ 
    private static int threadingCounter = 0; 
    private static readonly object lockThreads = new Object(); 
    private AutoResetEvent areSync = new AutoResetEvent(true); 

    // <param name="args"></param> 
    static void Main(string[] args) 
    { 
     Program myProgram = new Program(); 

     try 
     { 
      try 
      { 
       for (int outer = 0; outer < 1000; outer++) 
       { 
        threadingCounter = 500; 
        try 
        { 
         for (int inner = 0; inner < 500; inner++) 
         { 
          ThreadPool.QueueUserWorkItem(new 
           WaitCallback(myProgram.ThreadCall), inner); 
         } 
        } 
        catch (Exception ex) 
        { 
         Console.WriteLine("Exception :: " + ex.Message); 
        } 
        finally 
        {        
         myProgram.areSync.WaitOne(); 
        } 

        if(threadingCounter != 0) 
         Console.WriteLine("In Loop1, Thread Counter :: " + 
          threadingCounter); 
       } 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("Exception :: " + ex.Message); 
      }     
     } 
     catch(Exception ex) 
     { 
      Console.WriteLine("Exception :: " + ex.Message); 
     } 
     finally 
     { 
      threadingCounter = 0; 

      if (myProgram.areSync != null) 
      { 
       myProgram.areSync.Dispose(); 
       myProgram.areSync = null; 
      } 
     } 
    } 

    public void ThreadCall(object state) 
    { 
     try 
     { 
      int inner = (int)state; 
      Thread.Sleep(1); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine("Exception :: " + ex.Message); 
     } 
     finally 
     { 
      Interlocked.Decrement(ref threadingCounter); 
      if (threadingCounter <= 0) 
       areSync.Set(); 
     }     
    }   
} 
} 
+0

你的異常處理是該死的糟糕。至少有一個可見的錯誤,您沒有正確使用Interlocked.Decrement。您必須使用它的返回值,並且*從不*在減量時直接使用該變量。使用CountDownEvent陷入成功之坑。 –

+0

@HansPassant,你的評論是粗魯的,事實上不準確。 –

+0

好吧,我被欺負,改寫。您的異常處理極大地增強了您的代碼的可讀性。訪問由其他線程修改的變量不是問題。更好? –

回答

1

你已經初始化的AutoResetEvent與初始狀態信號(真),這將使

myProgram.areSync.WaitOne(); 

第一個呼叫繼續而不堵,所以它繼續外環和隊列執行再次線程池,因此結果混亂。這很清楚。

更新您的代碼以

private AutoResetEvent areSync = new AutoResetEvent(false); 

了預期的結果。希望這有助於

+0

AutoResetEvent的初始化值爲true,在我的文章中是一個錯誤,因爲我嘗試了多種事情,我已經將其設置爲false,這並沒有幫助。任何其他點可能會幫助 –

+0

nope,設置新的AutoResetEvent(false)爲我工作我測試了ur代碼。 –

+0

上面提到的漢斯糾正碼,否則不正確。將導致一個問題離線多線程的情況 –

0

基於在博客中提供的詳細信息的更新:

http://zvolkov.com/clog/2009/07/10?s=Better+ways+to+wait+for+queued+threads+to+complete

我創建的進化方式來同步任務的工作例如,使用較新的並行編程模式。在我看來,List最適合我的案例,因爲在執行前添加任務/值沒有時間浪費。請分享您的觀點。順便說一句,我已經嘗試改進異常處理這次只有一個主線程和一個每個子線程的塊。

//代碼

using System; 
using System.Collections.Generic; 
using System.Threading; 
using System.Threading.Tasks; 

namespace TestThreads 
{ 
class Program 
{ 
    private static int threadingCounter = 0; 
    #if ARE // Synchronizing using AutoResetEvent   
    private static AutoResetEvent areSync = new AutoResetEvent(false); 
    #elif CDE // Synchronizing using CountDownEvent 
    private static CountdownEvent cdeSync; 
    #elif TLIST // Synchronizing through a List<Tasks> 
    private static List<Task> tlSync; 
    #elif ALIST // Synchronizing through a List<Action> 
    private static List<Action> alSync; 
    #elif PFE 
    #endif 

    /// <summary> 
    /// 
    /// </summary> 
    /// <param name="args"></param> 
    static void Main(string[] args) 
    { 
     Program myProgram = new Program(); 

     try 
     {     
      for (int outer = 0; outer < 1000; outer++) 
      { 
       threadingCounter = 500; 
       #if ARE          
        for (int inner = 0; inner < 500; inner++) 
         ThreadPool.QueueUserWorkItem(new WaitCallback(myProgram.ThreadCall), inner); 

        areSync.WaitOne(); 
        Console.WriteLine("Main Thread Released Post Wait ..."); 

        if (threadingCounter != 0) 
         Console.WriteLine("Threading Counter post Inner Loop :: " + threadingCounter); 
       #elif CDE 
        cdeSync = new CountdownEvent(threadingCounter); 
        for (int inner = 0; inner < 500; inner++) 
         ThreadPool.QueueUserWorkItem(new WaitCallback(myProgram.ThreadCall), inner); 

        cdeSync.Wait(); 
        Console.WriteLine("Main Thread Released Post Wait ..."); 

        if (threadingCounter != 0) 
         Console.WriteLine("Threading Counter post Inner Loop :: " + threadingCounter); 
        cdeSync = null; 
       #elif TLIST 
        tlSync = new List<Task>(); 
        for (int inner = 0; inner < 500; inner++) 
         tlSync.Add(Task.Factory.StartNew(() => myProgram.ThreadCall(inner))); 

        Task.WaitAll(tlSync.ToArray()); 
        tlSync = null; 

        Console.WriteLine("Main Thread Released Post Wait ..."); 

        if (threadingCounter != 0) 
         Console.WriteLine("Threading Counter post Inner Loop :: " + threadingCounter); 
       #elif ALIST 
        alSync = new List<Action>(); 
        for (int inner = 0; inner < 500; inner++) 
         alSync.Add(() => myProgram.ThreadCall(inner)); 

        Parallel.Invoke(alSync.ToArray()); 
        alSync = null; 

        Console.WriteLine("Main Thread Released Post Wait ..."); 


        if (threadingCounter != 0) 
         Console.WriteLine("Threading Counter post Inner Loop :: " + threadingCounter); 
       #elif PFE 
        List<int> innerCount = new List<int>(); 
        for (int iCount = 0; iCount < 500; iCount++) 
         innerCount.Add(iCount); 

        Parallel.ForEach<int>(innerCount, inner => 
        { 
         myProgram.ThreadCall(inner); 
        } 
        ); 

        innerCount = null; 

        Console.WriteLine("Main Thread Released Post Wait ..."); 

        if (threadingCounter != 0) 
         Console.WriteLine("Threading Counter post Inner Loop :: " + threadingCounter); 
        #endif 
      } 

     } 
     catch(Exception ex) 
     { 
      Console.WriteLine("Exception :: " + ex.Message); 
     } 
     finally 
     { 
      threadingCounter = 0; 
      #if ARE     
      if (areSync != null) 
      { 
       areSync.Dispose(); 
       areSync = null; 
      } 
      #elif CDE 
      if (cdeSync != null) 
      { 
       cdeSync.Dispose(); 
       cdeSync = null; 
      } 
      #elif TLIST 
      if (tlSync != null) 
      tlSync = null; 
      #elif ALIST 
      if (alSync != null) 
       alSync = null; 
      #endif 
     } 
    } 

    /// <summary> 
    /// 
    /// </summary> 
    /// <param name="state"></param> 
    public void ThreadCall(object state) 
    { 
     try 
     { 
      int inner = (int)state; 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine("Exception :: " + ex.Message); 
     } 
     finally 
     {  
      #if ARE 
      if(Interlocked.Decrement(ref threadingCounter) <= 0) 
       areSync.Set(); 
      #elif CDE 
      Interlocked.Decrement(ref threadingCounter); 
      cdeSync.Signal(); 
      #elif TLIST 
      Interlocked.Decrement(ref threadingCounter); 
      #elif ALIST 
      Interlocked.Decrement(ref threadingCounter); 
      #elif PFE 
      Interlocked.Decrement(ref threadingCounter); 
      #endif 
     } 

    }   
    } 
} 
0

我會重構這個使用這樣的事情。這假定你想在內部循環的後臺線程中執行某些操作,在繼續進行下一個外部循環之前完成其中的每一個,避免混亂的異常處理,同時仍捕獲處理過程中發生的異常,以便在處理後處理這些異常對於內部和外部循環都是完整的。

// track exceptions that occurred in loops 
class ErrorInfo 
{ 
    public Exception Error { get; set; } 
    public int Outer { get; set; } 
    public int Inner { get; set; } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     // something to store execeptions from inner thread loop 
     var errors = new ConcurrentBag<ErrorInfo>(); 
     // no need to wrap a try around this simple loop 
     // unless you want an exception to stop the loop 
     for (int outer = 0; outer < 10; outer++) 
     { 
      var tasks = new Task[50]; 
      for (int inner = 0; inner < 50; inner++) 
      { 
       var outerLocal = outer; 
       var innerLocal = inner; 
       tasks[inner] = Task.Factory.StartNew(() => 
        { 
         try 
         { 
          Thread.Sleep(innerLocal); 
          if (innerLocal % 5 == 0) 
          { 
           throw new Exception("Test of " + innerLocal); 
          } 
         } 
         catch (Exception e) 
         { 
          errors.Add(new ErrorInfo 
          { 
           Error = e, 
           Inner = innerLocal, 
           Outer = outerLocal 
          }); 
         } 
        }); 
      } 
      Task.WaitAll(tasks); 
     } 
     Console.WriteLine("Error bag contains {0} errors.", errors.Count); 
     Console.ReadLine(); 
    } 
}