請原諒我的這個帖子作爲一個答案,因爲它不是真的回答你的問題,但是,它涉及到你的問題,因爲它涉及CorrelationManager行爲和線程/任務/等。我一直在考慮使用CorrelationManager的LogicalOperationStack
(和StartLogicalOperation/StopLogicalOperation
方法)在多線程場景中提供額外的上下文。
我把你的例子稍加修改,添加使用Parallel.For並行執行工作的能力。此外,我使用StartLogicalOperation/StopLogicalOperation
括號(內部)DoLongRunningWork
。從概念上講,DoLongRunningWork
做這樣的事情是每次執行:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
我發現,如果我添加這些邏輯操作代碼(或多或少是),所有的邏輯operatins的保持同步(總是預期的堆棧操作次數和堆棧操作的值始終如預期)。
在我的一些測試中,我發現並非總是如此。邏輯操作堆棧正在「損壞」。我可以想到的最好的解釋是,當「子」線程退出時,將「CallContext」信息「合併」回「父」線程上下文導致「舊」子線程上下文信息(邏輯操作)爲「繼承「由另一個兄弟的子線程。
這個問題也可能與以下事實有關:Parallel.For顯然使用主線程(至少在示例代碼中,如寫入的那樣)作爲「工作線程」之一(或者任何它們應該在並行域)。無論何時執行DoLongRunningWork,都會啓動一個新的邏輯操作(開始時)並停止(結束時)(也就是將其壓入LogicalOperationStack並從中彈出)。如果主線程已經具有邏輯操作並且DoLongRunningWork在主線程上執行,則啓動新的邏輯操作,因此主線程的LogicalOperationStack現在具有兩個操作。任何後續的DoLongRunningWork執行(只要DoLongRunningWork的這個「迭代」在主線程上執行)將(顯然)繼承主線程的LogicalOperationStack(現在有兩個操作,而不僅僅是一個預期的操作)。
我花了很長時間才弄清楚爲什麼LogicalOperationStack的行爲在我的示例中與我的示例的修改版本中的行爲不同。最後,我看到在我的代碼中,我用邏輯操作括住了整個程序,而在我的修改版本的測試程序中,我沒有。其含義是,在我的測試程序中,每次執行我的「工作」(類似於DoLongRunningWork),已經有一個合理的操作。在我的測試程序的修改版本中,我沒有將邏輯操作中的整個程序括起來。
所以,當我修改你的測試程序來將邏輯操作中的整個程序括起來,並且如果我使用的是Parallel.For,我遇到了完全相同的問題。
使用上面的概念模型,這將成功運行:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
雖然這最終將導致斷言到一個明顯的不同步LogicalOperationStack的:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
這裏是我的示例程序。它和你的類似,它有一個操作ActivityId和LogicalOperationStack的DoLongRunningWork方法。我也有兩種踢DoLongRunningWork的風格。一種風味使用任務,一種使用Parallel.For。也可以執行每種風格,以使得整個並行操作被封閉在邏輯操作中或不被操作。所以,總共有4種方式來執行並行操作。要嘗試每一個,只需取消註釋所需的「使用...」方法,重新編譯並運行。 UseTasks
,UseTasks(true)
和UseParallelFor
應該全部運行完成。 UseParallelFor(true)
將在某個時刻斷言,因爲LogicalOperationStack沒有預期的條目數。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
這整個如果LogicalOperationStack可以用的Parallel.For使用的問題(和/或其他線程/任務構建體)或它如何被使用的可能是值得它自己的問題。也許我會發佈一個問題。同時,我想知道你是否對此有任何想法(或者,我想知道是否考慮過使用LogicalOperationStack,因爲ActivityId看起來是安全的)。
[編輯]
見我的回答this question有關使用LogicalOperationStack和/或CallContext.LogicalSetData與一些不同的線程/線程池/任務/並行contstructs的更多信息。
也看到這裏,我的問題上SO約LogicalOperationStack和並行擴展: Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc
最後,還看到我的問題在這裏對微軟的並行擴展論壇: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9
在我的測試中,它看起來像跟蹤。如果您在主線程中啓動邏輯操作,然後在委託中啓動/停止邏輯操作,則CorrelationManager.LogicalOperationStack可能會在使用Parallel.For或Parallel.Invoke時損壞。在我的測試(見上述兩種鏈接)的LogicalOperationStack應該總是恰好有2項時DoLongRunningWork正在執行(如果我使用各種技術DoLongRunningWork的踢前開始在主線程的邏輯運算)。所以,通過「損壞」我的意思是說,LogicalOperationStack最終會有超過2個條目。
從我可以告訴,這可能是因爲和的Parallel.For使用Parallel.Invoke主線程爲「工人」的一個線程來執行DoLongRunningWork動作。
使用存儲在CallContext.LogicalSetData中的堆棧來模擬LogicalOperationStack(類似於通過CallContext.SetData存儲的log4net的LogicalThreadContext.Stacks)的行爲會產生更糟的結果。如果我正在使用這樣的堆棧來維護上下文,那麼在幾乎所有的情況下,它都會被損壞(即沒有預期的條目數),因爲我在主線程中有一個「邏輯操作」,每次迭代都有一個邏輯操作/執行DoLongRunningWork委託。
我沒有什麼優惠,但我也對這個問題感興趣。似乎同樣的問題也適用於使用CallContext.LogicalSetData的信息集,因爲這是Trace.CorrelationManager用來存儲ActivityId和LogicalOperationStack的技術。 – wageoghe 2010-12-14 19:47:08
@wageohe - 我終於可以今天這個測試,已經張貼了我的結果:) – 2010-12-15 00:35:58
我張貼一些更多的細節在我的答案。我還在這裏發佈了一個關於SO的另一個答案的鏈接,這是我在這裏問到的一個新問題,以及我在Microsoft的Parallel Extensions論壇上提問(但尚未在2011年1月21日回答)的問題。也許你會發現有用的信息,也許不會。 – wageoghe 2011-01-21 15:54:47