2010-12-02 96 views
11

在使用任務並行庫之前,我經常使用CorrelationManager.ActivityId跟蹤多線程的跟蹤/錯誤報告。任務並行庫中的任務如何影響ActivityID?

ActivityId存儲在線程本地存儲中,所以每個線程都有它自己的副本。這個想法是,當你啓動一個線程(活動)時,你分配一個新的ActivityId。 ActivityId將與任何其他跟蹤信息一起寫入日誌,使單個「活動」的跟蹤信息成爲可能。這對於WCF非常有用,因爲ActivityId可以結轉到服務組件。

這裏是我談論的例子:

static void Main(string[] args) 
{ 
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) => 
    { 
     DoWork(); 
    })); 
} 

static void DoWork() 
{ 
    try 
    { 
     Trace.CorrelationManager.ActivityId = Guid.NewGuid(); 
     //The functions below contain tracing which logs the ActivityID. 
     CallFunction1(); 
     CallFunction2(); 
     CallFunction3(); 
    } 
    catch (Exception ex) 
    { 
     Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString()); 
    } 
} 

現在,隨着TPL,我的理解是多任務共享線程。這是否意味着ActivityId很容易被重新初始化(通過其他任務)?有沒有新的機制來處理活動追蹤?

+0

我沒有什麼優惠,但我也對這個問題感興趣。似乎同樣的問題也適用於使用CallContext.LogicalSetData的信息集,因爲這是Trace.CorrelationManager用來存儲ActivityId和LogicalOperationStack的技術。 – wageoghe 2010-12-14 19:47:08

+0

@wageohe - 我終於可以今天這個測試,已經張貼了我的結果:) – 2010-12-15 00:35:58

+0

我張貼一些更多的細節在我的答案。我還在這裏發佈了一個關於SO的另一個答案的鏈接,這是我在這裏問到的一個新問題,以及我在Microsoft的Parallel Extensions論壇上提問(但尚未在2011年1月21日回答)的問題。也許你會發現有用的信息,也許不會。 – wageoghe 2011-01-21 15:54:47

回答

6

我跑了一些實驗,結果我的問題中的假設是不正確的 - 用TPL創建的多個任務不能同時在同一個線程上運行。

ThreadLocalStorage在.NET 4.0中與TPL一起使用是安全的,因爲線程一次只能由一個任務使用。

這些任務可以共享併發的基礎上接受採訪時,我聽到關於C#5.0DotNetRocks(抱歉,我不記得這表明它是)線程的假設 - 所以我的問題可能(也可能不會)即將成爲相關。

我的實驗啓動了許多任務,並記錄了運行了多少任務,他們花了多長時間,以及消耗了多少線程。如果有人想重複,代碼如下。

class Program 
{ 
    static void Main(string[] args) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 
     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
      task = Task.Factory.StartNew(() => 
      { 
       DoLongRunningWork(); 
      }, taskCreationOpt); 

      allTasks[i] = task; 
     } 

     Task.WaitAll(allTasks); 
     stopwatch.Stop(); 

     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.ReadKey(); 
    } 


    private static List<int> threadIds = new List<int>(); 
    private static object locker = new object(); 
    private static void DoLongRunningWork() 
    { 
     lock (locker) 
     { 
      //Keep a record of the managed thread used. 
      if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
       threadIds.Add(Thread.CurrentThread.ManagedThreadId); 
     } 
     Guid g1 = Guid.NewGuid(); 
     Trace.CorrelationManager.ActivityId = g1; 
     Thread.Sleep(3000); 
     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 
    } 
} 

輸出(當然這將取決於機器上)是:

Completed 100 tasks in 23097 milliseconds 
Used 23 threads 

更改taskCreationOpt到TaskCreationOptions.LongRunning給出不同的結果:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads 
3

請原諒我的這個帖子作爲一個答案,因爲它不是真的回答你的問題,但是,它涉及到你的問題,因爲它涉及CorrelationManager行爲和線程/任務/等。我一直在考慮使用CorrelationManager的LogicalOperationStack(和StartLogicalOperation/StopLogicalOperation方法)在多線程場景中提供額外的上下文。

我把你的例子稍加修改,添加使用Parallel.For並行執行工作的能力。此外,我使用StartLogicalOperation/StopLogicalOperation括號(內部)DoLongRunningWork。從概念上講,DoLongRunningWork做這樣的事情是每次執行:

DoLongRunningWork 
    StartLogicalOperation 
    Thread.Sleep(3000) 
    StopLogicalOperation 

我發現,如果我添加這些邏輯操作代碼(或多或少是),所有的邏輯operatins的保持同步(總是預期的堆棧操作次數和堆棧操作的值始終如預期)。

在我的一些測試中,我發現並非總是如此。邏輯操作堆棧正在「損壞」。我可以想到的最好的解釋是,當「子」線程退出時,將「CallContext」信息「合併」回「父」線程上下文導致「舊」子線程上下文信息(邏輯操作)爲「繼承「由另一個兄弟的子線程。

這個問題也可能與以下事實有關:Parallel.For顯然使用主線程(至少在示例代碼中,如寫入的那樣)作爲「工作線程」之一(或者任何它們應該在並行域)。無論何時執行DoLongRunningWork,都會啓動一個新的邏輯操作(開始時)並停止(結束時)(也就是將其壓入LogicalOperationStack並從中彈出)。如果主線程已經具有邏輯操作並且DoLongRunningWork在主線程上執行,則啓動新的邏輯操作,因此主線程的LogicalOperationStack現在具有兩個操作。任何後續的DoLongRunningWork執行(只要DoLongRunningWork的這個「迭代」在主線程上執行)將(顯然)繼承主線程的LogicalOperationStack(現在有兩個操作,而不僅僅是一個預期的操作)。

我花了很長時間才弄清楚爲什麼LogicalOperationStack的行爲在我的示例中與我的示例的修改版本中的行爲不同。最後,我看到在我的代碼中,我用邏輯操作括住了整個程序,而在我的修改版本的測試程序中,我沒有。其含義是,在我的測試程序中,每次執行我的「工作」(類似於DoLongRunningWork),已經有一個合理的操作。在我的測試程序的修改版本中,我沒有將邏輯操作中的整個程序括起來。

所以,當我修改你的測試程序來將邏輯操作中的整個程序括起來,並且如果我使用的是Parallel.For,我遇到了完全相同的問題。

使用上面的概念模型,這將成功運行:

Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 

雖然這最終將導致斷言到一個明顯的不同步LogicalOperationStack的:

StartLogicalOperation 
Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 
StopLogicalOperation 

這裏是我的示例程序。它和你的類似,它有一個操作ActivityId和LogicalOperationStack的DoLongRunningWork方法。我也有兩種踢DoLongRunningWork的風格。一種風味使用任務,一種使用Parallel.For。也可以執行每種風格,以使得整個並行操作被封閉在邏輯操作中或不被操作。所以,總共有4種方式來執行並行操作。要嘗試每一個,只需取消註釋所需的「使用...」方法,重新編譯並運行。 UseTasks,UseTasks(true)UseParallelFor應該全部運行完成。 UseParallelFor(true)將在某個時刻斷言,因爲LogicalOperationStack沒有預期的條目數。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Diagnostics; 
using System.Threading; 
using System.Threading.Tasks; 

namespace CorrelationManagerParallelTest 
{ 
    class Program 
    {  
    static void Main(string[] args)  
    { 
     //UseParallelFor(true) will assert because LogicalOperationStack will not have expected 
     //number of entries, all others will run to completion. 

     UseTasks(); //Equivalent to original test program with only the parallelized 
         //operation bracketed in logical operation. 
     ////UseTasks(true); //Bracket entire UseTasks method in logical operation 
     ////UseParallelFor(); //Equivalent to original test program, but use Parallel.For 
          //rather than Tasks. Bracket only the parallelized 
          //operation in logical operation. 
     ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation 
    }  

    private static List<int> threadIds = new List<int>();  
    private static object locker = new object();  

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId; 

    private static int mainThreadUsedInDelegate = 0; 

    // baseCount is the expected number of entries in the LogicalOperationStack 
    // at the time that DoLongRunningWork starts. If the entire operation is bracketed 
    // externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise, 
    // it will be 0. 
    private static void DoLongRunningWork(int baseCount)  
    { 
     lock (locker) 
     { 
     //Keep a record of the managed thread used.    
     if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
      threadIds.Add(Thread.CurrentThread.ManagedThreadId); 

     if (Thread.CurrentThread.ManagedThreadId == mainThreadId) 
     { 
      mainThreadUsedInDelegate++; 
     } 
     }   

     Guid lo1 = Guid.NewGuid(); 
     Trace.CorrelationManager.StartLogicalOperation(lo1); 

     Guid g1 = Guid.NewGuid();   
     Trace.CorrelationManager.ActivityId = g1; 

     Thread.Sleep(3000);   

     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 

     //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation 
     //in effect when the Parallel.For operation was started. 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1)); 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1)); 

     Trace.CorrelationManager.StopLogicalOperation(); 
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
     task = Task.Factory.StartNew(() => 
     { 
      DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }, taskCreationOpt); 
     allTasks[i] = task; 
     } 
     Task.WaitAll(allTasks); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    private static void UseParallelFor(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Parallel.For(0, totalThreads, i => 
     { 
     DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    } 
} 

這整個如果LogicalOperationStack可以用的Parallel.For使用的問題(和/或其他線程/任務構建體)或它如何被使用的可能是值得它自己的問題。也許我會發佈一個問題。同時,我想知道你是否對此有任何想法(或者,我想知道是否考慮過使用LogicalOperationStack,因爲ActivityId看起來是安全的)。

[編輯]

見我的回答this question有關使用LogicalOperationStack和/或CallContext.LogicalSetData與一些不同的線程/線程池/任務/並行contstructs的更多信息。

也看到這裏,我的問題上SO約LogicalOperationStack和並行擴展: Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc

最後,還看到我的問題在這裏對微軟的並行擴展論壇: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

在我的測試中,它看起來像跟蹤。如果您在主線程中啓動邏輯操作,然後在委託中啓動/停止邏輯操作,則CorrelationManager.LogicalOperationStack可能會在使用Parallel.For或Parallel.Invoke時損壞。在我的測試(見上述兩種鏈接)的LogicalOperationStack應該總是恰好有2項時DoLongRunningWork正在執行(如果我使用各種技術DoLongRunningWork的踢前開始在主線程的邏輯運算)。所以,通過「損壞」我的意思是說,LogicalOperationStack最終會有超過2個條目。

從我可以告訴,這可能是因爲和的Parallel.For使用Parallel.Invoke主線程爲「工人」的一個線程來執行DoLongRunningWork動作。

使用存儲在CallContext.LogicalSetData中的堆棧來模擬LogicalOperationStack(類似於通過CallContext.SetData存儲的log4net的LogicalThreadContext.Stacks)的行爲會產生更糟的結果。如果我正在使用這樣的堆棧來維護上下文,那麼在幾乎所有的情況下,它都會被損壞(即沒有預期的條目數),因爲我在主線程中有一個「邏輯操作」,每次迭代都有一個邏輯操作/執行DoLongRunningWork委託。