我將從一個基本解釋我如何理解一對事情的工作,然後用tldr完成全部內容開始,如果人們只是想要達到我在這裏遇到的實際問題。如果我對這裏的任何事情的理解是錯誤的,請糾正我。並行使用TPL中預期的每個雙線程
TPL代表Task Parallel Library,它是.NET 4.0的答案,旨在進一步簡化線程以便於開發人員使用。如果你不熟悉它(在一個非常基礎的層次上),你啓動一個新的Task對象並將它傳遞給一個委託,然後在從線程池獲取的後臺線程上運行(通過使用線程池而不是真正地製作新線程,時間和資源通過使用這些現有線程來保存,而不是創建和處理新線程)。
從我所瞭解的情況來看,C#中的Parallel.ForEach命令會爲它應該執行的每個委託生成一個新線程(很可能來自線程池),並且可能會自動執行一個或多個內聯如果編譯器確定它們會更快地發生以提高效率,那麼可能更多的迭代。
最相關的背景資料,我的目標:
我試圖做一個快速的程序,開始了一個任務與程序的其他部分同時運行。在這個任務中,Parallel.ForEach運行3次。總的來說,我們預計程序現在可以運行5個線程(最多):主線程爲1,實際任務爲1,Parallel.ForEach爲3。每個線程都有自己的目標來完成(儘管Parallel.ForEach都有相同的目標,其相關itemNumber的值有不同的計算值,當主線程完成所有目標時,它使用Task.Wait()等待。在完成任務,等待對Parallel.ForEach完成以及隨後使用的價值和驗證
tldr;實際的問題:
當上述理念運行時,Parallel.ForEach出現因爲SynchronizationContexts(本質上是其他線程的TPL對象)的初始化速度是我期望的並且運行它們的兩倍,但是隻是等待它們的預期數量。因爲Parallel.ForEach()。Wait()命令在預期的線程運行數Ta完成然後sk也完成了,因爲它認爲一切都完成了。然後主程序檢測到Task已經完成,並且驗證當前沒有更多的後臺線程正在運行,偶爾剩下的Parrallel.ForEach()還沒有完成,因此拋出了錯誤。
線程的數量已經過驗證,可以與我在每個SynchronizationContext的post調用(異步方法kicker)上打印到調試窗口中的內容相匹配。每個線程也被一個主線程對象引用,否則它計劃在完成任務時被處置掉,但由於尚未真正期望創建未完成的線程,所以引用仍然存在,因此處理無法正常進行。
Thread testThread = Thread.CurrentThread;
Task backgroundTask = taskFactory.StartNew(() =>
{
Thread rootTaskThread = Thread.CurrentThread;
Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline");
Thread.Sleep(TimeSpan.FromSeconds(2));
Parallel.ForEach(new[] { 1, 2, 3, 4 },
new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => {
Thread.Sleep(TimeSpan.FromSeconds(1));
});
});
在上述例子中,主線程,所述backgroundTask任務,和8個Parallel.ForEach線程最終現有,其中最後9是在SynchronizationContexts創建。
中的SynchronizationContext重寫我的自定義的唯一方法是郵電如下:
public override void Post(SendOrPostCallback d, object state){
Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null;
Request.IAsyncContextData requestData = null;
if (requestOrNull != null){
requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount();
}
Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
base.Post((object internalState) => {
// Capture the spawned thread state and restore the originating thread state
try{
if (requestData != null){
Request.AttachToAsynchronousContext(requestData);
}
d(state);
}
finally{
// Restore original spawned thread state
if (requestData != null){
// Disposes the request if this is the last reference to it
Request.DetachFromAsynchronousContext(requestData);
}
Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
}
}, state);
}
的TaskScheduler,我相信是這樣做只是它所需要的基本的東西:
private readonly RequestSynchronizationContext context;
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();
public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext)
{
this.context = synchronizationContext;
}
protected override void QueueTask(Task task){
this.tasks.Enqueue(task);
this.context.Post((object state) => {
Task nextTask;
if (this.tasks.TryDequeue(out nextTask))
this.TryExecuteTask(nextTask);
}, null);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){
if (SynchronizationContext.Current == this.context)
return this.TryExecuteTask(task);
else
return false;
}
protected override IEnumerable<Task> GetScheduledTasks(){
return this.tasks.ToArray();
}
TaskFactory :
public RequestTaskFactory(RequestTaskScheduler taskScheduler)
: base(taskScheduler)
{ }
關於爲什麼會發生這種情況的任何想法?
您能發佈一個簡短但完整的示例來顯示您的行爲嗎?特別是你的'SynchronizationContext'(我假設你正在使用一個自定義的)。因爲我沒有看到你描述的行爲。另外,我真的不明白爲什麼使用'Post()'超過您的預期會給您帶來任何問題?你的'SynchronizationContext'是否有一些依賴於此的特殊行爲? – svick 2012-08-08 16:54:28
你想通過這樣做完成什麼? 'Assert.AreNotEqual'表明你正在編寫某種單元測試。但是,'Assert.AreNotEqual'確實驗證了TPL,而不是你的代碼 - 所以,我沒有看到驗證第三方代碼的意義。 – 2012-08-10 18:47:25