2011-07-01 84 views
6

在下面的代碼中,我想同步任務列表的結果報告。這是因爲task.Result阻塞,直到任務完成。但是,任務ID = 3需要很長時間才能完成,並阻止所有其他已完成的任務報告其狀態。沒有UI線程的任務同步

我認爲我可以通過將報告(Console.Write)移動到.ContinueWith指令中,但我沒有UI線程,因此如何獲取TaskScheduler來同步.ContinueWith任務?

我現在擁有的一切:

static void Main(string[] args) 
{ 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 

    var tasks = new List<Task<int>>(); 

    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     var t = Task<int>.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 5000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      return num; 
     }); 
     tasks.Add(t); 
    } 

    foreach (var task in tasks) 
    { 
     Console.WriteLine("Completed {0} on {1}", task.Result, Thread.CurrentThread.ManagedThreadId); 
    } 

    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
} 

我想移動到這個或類似的東西,但我需要的Console.Write(「已完成...」),以一切發生在同一線程上:

static void Main(string[] args) 
{ 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 

    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     Task<int>.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 10000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      return num; 
     }).ContinueWith(value => 
     { 
      Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId); 
     } 

    /* need syncronization context */); 
    } 

    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
} 

- 解決方案 - 得到一些意見和閱讀部分的解決方案,這是完整的解決方案,我想要做什麼之後。這裏的目標是儘可能快地處理長時間運行的任務,然後對每個任務的結果進行一次處理。

static void Main(string[] args) 
{ 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 

    var results = new BlockingCollection<int>(); 

    Task.Factory.StartNew(() => 
    { 
     while (!results.IsCompleted) 
     { 
      try 
      { 
       var x = results.Take(); 
       Console.WriteLine("Completed {0} on {1}", x, Thread.CurrentThread.ManagedThreadId); 
      } 
      catch (InvalidOperationException) 
      { 
      } 
     } 
     Console.WriteLine("\r\nNo more items to take."); 
    }); 

    var tasks = new List<Task>(); 

    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     var t = Task.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 10000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      results.Add(num); 
     }); 

     tasks.Add(t); 
    } 

    Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => results.CompleteAdding()); 

    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
} 
+0

我假設控制檯線程是一個替身的GUI(的WinForms/WPF)。這不是一個好主意,Dispatcher/Messageloop的存在會造成(很大的)差異。 –

+1

否則,請思考「在同一個線程上發生的事情」。除非該線程正在輪詢,否則不能這樣做。 –

+0

我需要對其進行更改,以便一次只運行一個ContinueWith任務。不管他們是否在同一個線程上運行,但我不能讓他們中的兩個並行運行。在現實生活中,ContinueWith部分將大量數據寫入數據庫。 –

回答

1

你必須建立某種形式的作家的任務,但是,請記住,即使任務可以被重新安排到其他本地或託管線程!使用TPL中的默認調度程序,您無法控制哪個託管線程接收工作。

public class ConcurrentConsole 
{ 
    private static BlockingCollection<string> output 
     = new BlockingCollection<string>(); 

    public static Task CreateWriterTask(CancellationToken token) 
    { 
     return new Task(
      () => 
      { 
       while (!token.IsCancellationRequested) 
       { 
        string nextLine = output.Take(token); 
        Console.WriteLine(nextLine); 
       } 
      }, 
      token); 
    } 

    public static void WriteLine(Func<string> writeLine) 
    { 
     output.Add(writeLine()); 
    } 
} 

當我打開你的代碼中使用這個我收到了以下的輸出:

End of Main 
Done 1 on 6 
Completed 1 on 6 
Done 5 on 9 
Completed 5 on 9 
Done 0 on 4 
Completed 0 on 4 
Done 2 on 5 
Completed 2 on 13 
Done 7 on 10 
Completed 7 on 10 
Done 4 on 8 
Completed 4 on 5 
Done 9 on 12 
Completed 9 on 9 
Done 6 on 6 
Completed 6 on 5 
Done 8 on 11 
Completed 8 on 4 
Done 3 on 7 
Completed 3 on 7 

即使你的代碼發送() => String.Format("Completed {0} on {1}"...ConcurrentConsole.WriteLine,確保ManagedThreadId將是對ConcurrentConsole任務回升,它仍然會改變它運行的線程。儘管比正在執行的任務更少變化。

+0

這實際上比我的foreach任務循環更糟糕。在報告其中任何一項的結果之前,它等待所有任務完成。我正在尋找的是一種完成任務並完成該任務並將該報告同步到一個線程的方法。我不希望一個長時間運行的任務阻止其他人報告。 –

+0

@Ryan:我已經將報告添加到了特定的任務中,這是您可以在TPL中保證的最好的任務。 – user7116

+0

這不完全是我如何實現系統,但使用BlockingCollection的想法是實現的核心。 –

0

我建議:

1)創建的鎖定對象
2)創建一個字符串列表寫入
3)因爲它產生的循環,睡了一下線程,然後鎖定字符串列表,然後如果它不是空的,寫入所有這些並清空列表
4)其他線程然後鎖定列表,添加它們的狀態,解鎖並繼續。

object writeListLocker = new object(); 
List<string> linesToWrite = new List<string>(); 

// Main thread loop 
for (; ;) 
{ 
    lock (writerListLocker) 
    { 
     foreach (string nextLine in linesToWrite) 
      Console.WriteLine(nextLine); 
     linesToWrite.Clear(); 
    } 
    Thread.Sleep(500); 
} 

// Reporting threads 
lock (writerListLocker) 
{ 
    linesToWrite.Add("Completed (etc.)"); 
} 
+0

我想繼續使用TPL,如果我能夠。它看起來並不像上面所說的那樣,在一系列線程上執行處理,而在一個線程上同步寫入結果。 –

+0

但是,這正是這樣做。主線程循環完成所有的寫操作。其他線程全部添加到linesToWrite對象而不是寫入。 –

1

您可以使用OrderedTaskScheduler確保一次只運行一個任務;但是,它們將在線程池線程上運行(不一定全部在同一個線程上)。

如果你真的需要在同一個線程上(而不是一次一個),那麼你可以使用ActionThreadNito.Async library。它爲其代碼提供SynchronizationContext,可以通過FromCurrentSynchronizationContext找到。

+0

這是我想去的方向。我想將ContinueWith操作放到單個線程中,以確保一次只能運行其中的一個。不知道我是否可以在生產中使用OrderedTaskScheduler。我想使用FromCurrentSynchronizationContext,但我不能因爲我處於服務環境(a.k.a.沒有UI線程)而SyncronizationContext.Current返回null。 –

+0

我認爲'OrderedTaskScheduler'和'ActionThread'都是生產質量。聽起來他們中的任何一個都可以解決你的問題。 –

0

我想你會期待像下面這樣的結果。

Starting on 8 
Done 1 on 11 
Completed 1 on 9 
Done 5 on 11 
Completed 5 on 9 
Done 0 on 10 
Completed 0 on 9 
Done 2 on 12 
Completed 2 on 9 
Done 7 on 16 
Completed 7 on 9 
Done 4 on 14 
Completed 4 on 9 
Done 9 on 18 
Completed 9 on 9 
Done 6 on 15 
Completed 6 on 9 
Done 8 on 17 
Completed 8 on 9 
Done 3 on 13 
Completed 3 on 9 

如下,我用StaSynchronizationContext在我的代碼從the Understanding SynchronizationContext其中一個線程同步調用很好地解釋。請參考它。

我的代碼片斷是:

static void Main(string[] args) 
{ 
    StaSynchronizationContext context = new StaSynchronizationContext(); 
    StaSynchronizationContext.SetSynchronizationContext(context); 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 
    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     Task<int>.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 10000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      return num; 
     }).ContinueWith(
     value => 
     { 
      Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId); 
     } 
     ,TaskScheduler.FromCurrentSynchronizationContext()); 
    } 
    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
}