2013-03-01 42 views
0

給定以下代碼,是否可以爲Task doThing的實例定義調度程序,創建和延續設置?如何計劃任務以使它們不同時運行,包括異步延續

我希望能夠安排doThing的多個實例,以便它們實際上僅從其他實例運行(即使它們正在等待其他子任務)。

private static async Task doThing(object i) 
    { 
     Console.WriteLine("in do thing {0}", (int)i); 
     await Task.Delay(TimeSpan.FromSeconds(5)); 
     Console.WriteLine("out of do thing {0}", (int)i); 
    } 
    static void Main(string[] args) 
    { 

     CancellationTokenSource source = new CancellationTokenSource(); 
     ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Current); 

     Task Task1 = Task.Factory.StartNew((Func<object,Task>)doThing, 1, source.Token, TaskCreationOptions.AttachedToParent, pair.ExclusiveScheduler).Unwrap(); 
     Task Task2 = Task.Factory.StartNew((Func<object, Task>)doThing, 2, source.Token, TaskCreationOptions.AttachedToParent, pair.ExclusiveScheduler); 
     Task Task3 = doThing(3); 
     Task Task4 = Task.Factory.StartNew(async (i) => 
     { 
      Console.WriteLine("in do thing {0}", (int)i); 
      await Task.Delay(TimeSpan.FromSeconds(5)); 
      Console.WriteLine("out of do thing {0}", (int)i); 
     }, 4, source.Token, TaskCreationOptions.None, pair.ExclusiveScheduler); 
     Task.WaitAll(Task1, Task2, Task3, Task4); 
     Console.ReadKey(); 
     return; 
    } 
+0

你的意思是'Task2'應該在Task1完成後才能啓動? – svick 2013-03-01 22:01:56

+0

是的。我知道'.ContinueWith',但我不關心它們執行的順序,只要它們不同時執行。 – Jake 2013-03-01 22:19:47

+0

你的「非lambda」任務有什麼意義?在你上面的例子中,雖然你不使用lambda,但是你在任務中使用委託,這是唯一重要的事情。 Lambdas只是語法糖。 – 2013-03-03 17:15:00

回答

4

TPL TaskSchedulers一次只能看到一個異步方法的同步段,所以你不能簡單地用調度器來完成。但你可以做到這一點,使用更高級別的基元。我經常使用的是TPL Dataflow。

首先,安裝NuGet包:

Install-Package Microsoft.Tpl.Dataflow 

然後使用此代碼:

private static async Task doThing(object i) { 
    Console.WriteLine("in do thing {0}", (int)i); 
    await Task.Delay(TimeSpan.FromSeconds(5)); 
    Console.WriteLine("out of do thing {0}", (int)i); 
} 

static void Main(string[] args) { 
    CancellationTokenSource source = new CancellationTokenSource(); 
    var exclusivityBlock = new ActionBlock<Func<Task>>(f => f(), new ExecutionDataflowBlockOptions { CancellationToken = source.Token }}; 
    exclusivityBlock.Post(() => doThing(1)); 
    exclusivityBlock.Post(() => doThing(2)); 
    exclusivityBlock.Post(() => doThing(3)); 
    exclusivityBlock.Post(
     async() => { 
      Console.WriteLine("in do thing {0}", 4); 
      await Task.Delay(TimeSpan.FromSeconds(5)); 
      Console.WriteLine("out of do thing {0}", 4); 
     }); 
    exclusivityBlock.Complete(); 
    exclusivityBlock.Completion.Wait(); 
    Console.WriteLine("Done"); 
    Console.ReadKey(); 
    return; 
} 

此代碼缺少對每個投遞工作項目單個任務。如果這是重要的,你可以使用此示例:

internal static class Program { 
    private static async Task doThing(object i) { 
     Console.WriteLine("in do thing {0}", (int)i); 
     await Task.Delay(TimeSpan.FromSeconds(5)); 
     Console.WriteLine("out of do thing {0}", (int)i); 
    } 

    private static void Main(string[] args) { 
     CancellationTokenSource source = new CancellationTokenSource(); 
     var exclusivityBlock = CreateTrackingBlock<Func<Task>>(
      f => f(), new ExecutionDataflowBlockOptions { CancellationToken = source.Token }); 
     var task1 = exclusivityBlock.PostWithCompletion(() => doThing(1)); 
     var task2 = exclusivityBlock.PostWithCompletion(() => doThing(2)); 
     var task3 = exclusivityBlock.PostWithCompletion(() => doThing(3)); 
     var task4 = exclusivityBlock.PostWithCompletion(
      async() => { 
       Console.WriteLine("in do thing {0}", 4); 
       await Task.Delay(TimeSpan.FromSeconds(5)); 
       Console.WriteLine("out of do thing {0}", 4); 
      }); 

     Task.WaitAll(task1, task2, task3, task4); 
     Console.WriteLine("Done"); 
     Console.ReadKey(); 
     return; 
    } 

    private static ActionBlock<Tuple<T, TaskCompletionSource<object>>> CreateTrackingBlock<T>(Func<T, Task> action, ExecutionDataflowBlockOptions options = null) { 
     return new ActionBlock<Tuple<T, TaskCompletionSource<object>>>(
      async tuple => { 
       try { 
        await action(tuple.Item1); 
        tuple.Item2.TrySetResult(null); 
       } catch (Exception ex) { 
        tuple.Item2.TrySetException(ex); 
       } 
      }, 
      options ?? new ExecutionDataflowBlockOptions()); 
    } 

    internal static Task PostWithCompletion<T>(this ActionBlock<Tuple<T, TaskCompletionSource<object>>> block, T value) { 
     var tcs = new TaskCompletionSource<object>(); 
     var tuple = Tuple.Create(value, tcs); 
     block.Post(tuple); 
     return tcs.Task; 
    } 
} 

不過請注意,這僅僅是更費力一些,因爲數據流是不是主要設計用於追蹤個人意見,而是整個過程。所以雖然上述工作很好,但Stephen Cleary的答案可能更簡單,因此更可取。

+0

+1。我學到了一個新的竅門! :) – 2013-03-03 18:27:19

+0

此外,您可以使用塊選項來指定一個'TaskScheduler'。從技術上講,這是一個lambda任務,但它仍然是一個很棒的技術。 – 2013-03-03 22:18:14

+0

是的,在選項中有很多好東西。至於'lambda任務',正如我對這個問題本身所做的評論一樣,它是無關緊要的。方法組和lambda任務幾乎適用於所有目的。 – 2013-03-03 22:40:51

2

考慮下面的代碼,是不是可以定義調度,創建和延續設置任務doThing的實例?

壞消息是:不,沒有辦法做到這一點。爲非lambda任務定義「調度程序」是沒有意義的。創建選項不是必需的,繼續選項設置在繼續,而不是任務本身。

好消息是:你不需要這種行爲。

您想異步同步。內置的方式做到這一點是使用SemaphoreSlim,因爲這樣的:

SemaphoreSlim mutex = new SemaphoreSlim(1); 
private static async Task doThingAsync(object i) 
{ 
    await mutex.WaitAsync(); 
    try 
    { 
     Console.WriteLine("in do thing {0}", (int)i); 
     await Task.Delay(TimeSpan.FromSeconds(5)); 
     Console.WriteLine("out of do thing {0}", (int)i); 
    } 
    finally 
    { 
     mutex.Release(); 
    } 
} 

就個人而言,我覺得finally語法是尷尬,所以我定義IDisposable和使用using代替。

如果您需要更多功率,Stephen Toub有一個async coordination primitives series,我有一個full suite of primitives in my AsyncEx library。這兩種資源都包含AsyncLockTask<IDisposable> WaitAsync()成員,因此您可以使用using而不是finally

+0

我不同意,當然有一種方法可以避免任務之間的併發。我會添加一個我自己的答案。 – 2013-03-03 17:12:53