1

我將從一個基本解釋我如何理解一對事情的工作,然後用tldr完成全部內容開始,如果人們只是想要達到我在這裏遇到的實際問題。如果我對這裏的任何事情的理解是錯誤的,請糾正我。並行使用TPL中預期的每個雙線程

TPL代表Task Parallel Library,它是.NET 4.0的答案,旨在進一步簡化線程以便於開發人員使用。如果你不熟悉它(在一個非常基礎的層次上),你啓動一個新的Task對象並將它傳遞給一個委託,然後在從線程池獲取的後臺線程上運行(通過使用線程池而不是真正地製作新線程,時間和資源通過使用這些現有線程來保存,而不是創建和處理新線程)。

從我所瞭解的情況來看,C#中的Parallel.ForEach命令會爲它應該執行的每個委託生成一個新線程(很可能來自線程池),並且可能會自動執行一個或多個內聯如果編譯器確定它們會更快地發生以提高效率,那麼可能更多的迭代。

最相關的背景資料,我的目標:

我試圖做一個快速的程序,開始了一個任務與程序的其他部分同時運行。在這個任務中,Parallel.ForEach運行3次。總的來說,我們預計程序現在可以運行5個線程(最多):主線程爲1,實際任務爲1,Parallel.ForEach爲3。每個線程都有自己的目標來完成(儘管Parallel.ForEach都有相同的目標,其相關itemNumber的值有不同的計算值,當主線程完成所有目標時,它使用Task.Wait()等待。在完成任務,等待對Parallel.ForEach完成以及隨後使用的價值和驗證

tldr;實際的問題:

當上述理念運行時,Parallel.ForEach出現因爲SynchronizationContexts(本質上是其他線程的TPL對象)的初始化速度是我期望的並且運行它們的兩倍,但是隻是等待它們的預期數量。因爲Parallel.ForEach()。Wait()命令在預期的線程運行數Ta完成然後sk也完成了,因爲它認爲一切都完成了。然後主程序檢測到Task已經完成,並且驗證當前沒有更多的後臺線程正在運行,偶爾剩下的Parrallel.ForEach()還沒有完成,因此拋出了錯誤。

線程的數量已經過驗證,可以與我在每個SynchronizationContext的post調用(異步方法kicker)上打印到調試窗口中的內容相匹配。每個線程也被一個主線程對象引用,否則它計劃在完成任務時被處置掉,但由於尚未真正期望創建未完成的線程,所以引用仍然存在,因此處理無法正常進行。

Thread testThread = Thread.CurrentThread; 
Task backgroundTask = taskFactory.StartNew(() => 
{ 
    Thread rootTaskThread = Thread.CurrentThread; 
    Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline"); 
    Thread.Sleep(TimeSpan.FromSeconds(2)); 

    Parallel.ForEach(new[] { 1, 2, 3, 4 }, 
     new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => { 
     Thread.Sleep(TimeSpan.FromSeconds(1)); 
    }); 
}); 

在上述例子中,主線程,所述backgroundTask任務,和8個Parallel.ForEach線程最終現有,其中最後9是在SynchronizationContexts創建。

中的SynchronizationContext重寫我的自定義的唯一方法是郵電如下:

public override void Post(SendOrPostCallback d, object state){ 
    Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null; 
    Request.IAsyncContextData requestData = null; 

    if (requestOrNull != null){ 
     requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount(); 
    } 

    Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS")); 

    base.Post((object internalState) => { 
     // Capture the spawned thread state and restore the originating thread state 
     try{ 
      if (requestData != null){ 
       Request.AttachToAsynchronousContext(requestData); 
      } 
      d(state); 
     } 
     finally{ 
      // Restore original spawned thread state 
      if (requestData != null){ 
      // Disposes the request if this is the last reference to it 
       Request.DetachFromAsynchronousContext(requestData); 
      } 
     Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS")); 
     } 
    }, state); 
} 

的TaskScheduler,我相信是這樣做只是它所需要的基本的東西:

private readonly RequestSynchronizationContext context; 
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>(); 

public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext) 
{ 
    this.context = synchronizationContext; 
} 

protected override void QueueTask(Task task){ 
    this.tasks.Enqueue(task); 
    this.context.Post((object state) => { 
     Task nextTask; 
     if (this.tasks.TryDequeue(out nextTask)) 
      this.TryExecuteTask(nextTask); 
    }, null); 
} 

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){ 
    if (SynchronizationContext.Current == this.context) 
     return this.TryExecuteTask(task); 
    else 
     return false; 
} 

protected override IEnumerable<Task> GetScheduledTasks(){ 
    return this.tasks.ToArray(); 
} 

TaskFactory :

public RequestTaskFactory(RequestTaskScheduler taskScheduler) 
    : base(taskScheduler) 
{ } 

關於爲什麼會發生這種情況的任何想法?

+0

您能發佈一個簡短但完整的示例來顯示您的行爲嗎?特別是你的'SynchronizationContext'(我假設你正在使用一個自定義的)。因爲我沒有看到你描述的行爲。另外,我真的不明白爲什麼使用'Post()'超過您的預期會給您帶來任何問題?你的'SynchronizationContext'是否有一些依賴於此的特殊行爲? – svick 2012-08-08 16:54:28

+0

你想通過這樣做完成什麼? 'Assert.AreNotEqual'表明你正在編寫某種單元測試。但是,'Assert.AreNotEqual'確實驗證了TPL,而不是你的代碼 - 所以,我沒有看到驗證第三方代碼的意義。 – 2012-08-10 18:47:25

回答

0

任務本身不會創建線程。 TaskScheduler決定如何操作以使操作異步(如果可以的話)。例如一些操作使用異步IO,在這種情況下,硬件使其異步,而不是另一個工作線程。

連續調用的方式取決於同步上下文。該上下文不是另一個線程,它只是抽象操作可以運行的標準。在WPF,WinForms,Silverlight等中,例如有一個UI同步上下文需要在特定線程(UI線程或主線程,以避免異常)上執行操作。

ForEach將嘗試來創建線程(更具體地說,它會嘗試詢問同步上下文以啓動多個異步操作)。調度程序確定它是如何做到的。如果你給它三項任務,它可能是創建三個線程或它可能不是。它決定三個併發線程是否是好事。例如,如果您只有兩個內核,ForEach不會創建兩個以上的線程,因爲使用單個線程並由於上下文切換開銷而按順序運行代碼可能會更糟糕。

目前還不清楚「初始化SynchronizationContext的兩倍」是什麼意思。這些不是線程。你是否僅僅意味着它創造的線程超出了你的預期?或者你的意思是郵政被稱爲超過你的預期?什麼是你的SynchronizationContext類基於? (即它的基類是什麼)。基地確實在很大程度上定義了Post調用將會如何。它可能覺得需要創建另一個異步操作來跟蹤其他操作......您如何讓調度程序使用此上下文?

SynchronizationContext在TPL(首先出現在.NET 2.0中)之前就已經存在。它所做的一件事是管理異步操作請求。從您發帖後不清楚您是否理解這一點。

更新: 第一次調用QueueTask來自StartNew。 對QueueTask的第二次調用來自間接調用ForEach 第三次調用QueueTask間接來自QueueTask中的TryExecuteTask 對QueueTask的後續4次調用是將正文傳遞給ForEach。

根據負載的不同,QueueTask可能最多再調用3次。如果我在QueueTask上調試並中斷,則QueueTask只會被調用7次。

此時,由於您在QueueTask中執行與TPL不同的操作(即,TryExecuteTask調用額外的操作)很難說爲什麼有時會有一些額外的QueueTask調用。這可能來自您實施QueueTask的方式,因爲您正在有效地要求調度程序從已異步執行的任務中排隊另一個異步任務。我的猜測是,這只是時機。可以快速調用QueueTask(因爲這是從另一個異步操作完成的),TryExecuteTask不知道任務已排隊,並強制任務執行(強制調用另一個調用QueueTask)。

如果QueueTask事實上正在導致對QueueTask的另一個調用,因爲它尚未調度它所在的任務,這就解釋了爲什麼最多有10個對QueueTask的調用。這是TryExecteTask調用導致每個ForEach正文「雙」調用...

+0

My SynchronizationContext直接擴展了基本的SynchronizationContext,除了post之外,沒有其他方法被覆蓋。 是的,帖子被稱爲超過我的預期。對於ForEach循環中的每次迭代,都會進行2次post調用。或者最起碼,它是這樣的:一個ForEach大小爲3的結果總共有7個post調用,size 4的結果是9個post調用,等等...... 基本的TaskScheduler也用於基本的TaskFactory,沒有no重寫的方法。 你是對的,我沒有意識到SynchronizationContext在TPL之前就已經存在。 – 2012-08-09 15:13:29

+0

TaskScheduler是一個抽象類;你如何實現這些抽象方法?另外,你如何實現TaskFactory?你在做與TPL不同的事情。你如何設置你的TaskFactory的調度器屬性?我不知道你是否在使用默認的任務調度器(它是SynchronizationContextTaskScheduler或ThreadPoolTask​​Scheduler),所以它看起來就像你在比較蘋果和橙子。 – 2012-08-09 15:53:44

+0

添加了我的TaskScheduler實現,TaskFactory與我添加的一樣簡單。至於任何TaskFactory調度器屬性,這將是我不知道必須做的事情? – 2012-08-09 16:55:36