2012-07-11 29 views
4

我想使用任務並行庫(SL5具有任務工廠,但沒有Parallel.For)在Silverlight 5.0應用程序中實現以下內容。我有足夠的線程知識,但沒有對TPL所以這似乎是一個很好的任務中獲得一些:)多個操作的並行化和結果的並置

目前,我有一些代碼,它執行同步如下:

public interface IProcessor 
{ 
    IEnumerable<Bar> Provide(Foo param) 
} 

private IEnumerable<IProcessor> processors; 

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback) 
{ 
    List<Bar> allResults = new List<Bar>(); 

    foreach(var processor in this.processors) 
    { 
     allResults.AddRange(processor.Provide(param)); 
    } 

    callback(allResults); 
} 

考慮每個IProcessor接受Foo參數Provide返回IEnumerable<Bar>。所有結果的聚合都通過回調發回給調用者。

現在一些IP處理器立即執行。有些人打電話給服務器,可能需要幾秒鐘的時間。我想爲N IProcessor個實例安排N個任務,並在所有完成(或超時)時連接IEnumerable<Bar>結果。

如果可能我想增加一個超時到整體操作,所以如果在15秒內沒有完成,拋出。

你的幫助非常感謝:)

+0

當'IProcessor'顯然是一個同步接口時它們怎麼會是異步的? – svick 2012-07-11 11:58:31

+0

對不起,只是重構了與IProcessor的接口同步(阻塞)。我將重新提出這個問題 – 2012-07-11 12:49:51

回答

4

再次,我不能對此進行測試代碼,但它應該工作,如果Silverlight不具有Parallel.ForEach你可以使用Task.WaitAll

private IEnumerable<IProcessor> processors; 

    public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback) 
    { 
     var allResults = new ConcurrentQueue<Bar>(); 
     Task.WaitAll(processors.Select(processor => Task.Factory.StartNew(() => GetData(processor, param, allResults))).ToArray()); 
     callback(allResults); 
    } 

    private static void GetData(IProcessor processor, Foo param, ConcurrentQueue<Bar> allResults) 
    { 
     var enumerable = processor.Provide(param); 
     foreach (var bar in enumerable) 
     { 
      allResults.Enqueue(bar); 
     } 
    } 
+0

Ach,它的Silverlight沒有Parallel.ForEach。對於努力,仍然+1 +1 – 2012-07-11 13:44:09

+0

使用此代碼,每個處理器的結果可以交錯。這可能也可能不是問題。 – svick 2012-07-11 13:47:59

+0

@svick無論如何,我需要做一些訂購,這不是問題。什麼是Parallel.ForEach在silverlight 5中不可用。您可以通過以下方式創建任務:-) – 2012-07-11 13:57:49

0

隨着TPL可以傳遞循環狀態以通知其他線程在超時的情況下中止。你需要做的是這樣的:

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback) 
    { 
     ConcurrentBag<Bar> allResults = new ConcurrentBag<Bar>(); 

     Stopwatch sw = new Stopwatch(); 
     sw.Start(); 

     Parallel.ForEach(this.processors, (processor, loopState) => 
     { 
      foreach (Bar item in processor.Provide(param)) 
      { 
       allResults.Add(item); 
      } 

      if (sw.ElapsedMilliseconds > 15000) 
      { 
       loopState.Stop(); 
       throw new TimeoutException(); 
      } 
     }); 

     callback(allResults); 
    } 
1

我認爲這是大致正確

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback) 
{ 
    var allResults = new List<Bar>(); 

    // We are using all the default options on the TaskFactory 
    // except when we are appending the results this has to be synchronized 
    // as List<> is not multithreading safe (a more appropriate collection perhaps) 
    var taskFactory = new TaskFactory<IEnumerable<Bar>>(
     TaskCreationOptions.None, 
     TaskContinuationOptions.ExecuteSynchronously); 

    // Kick off a task for every processor 
    var tasks = 
     new List<Task<IEnumerable<Bar>>>(processors.Count()); 
    tasks.AddRange(
     processors.Select(
      processor => 
      taskFactory.StartNew(() => processor.Provide(param)))); 

    if (Task.WaitAll(tasks.ToArray(), 5 * 1000)) 
    { 
     foreach (Task<IEnumerable<Bar>> task in tasks) 
     { 
      allResults.AddRange(task.Result); 
     } 
     callback(allResults); 
    } 
} 
+0

這將逐個運行任務,更正嗎? – sll 2012-07-11 14:35:57

+0

@sll不,返回類型的.StartNew()是一個任務(很大程度上立即返回)。該任務是異步啓動的。 – 2012-07-11 14:42:14

+0

只有一個問題忽略了很好的'ContinueWith' - 在多個異步操作中使用相同的List實例是很危險的,所以你必須在'ContinueWith'中擴展匿名方法或者使用其中一個沒有'AddRange'的關聯集合。 – sll 2012-07-11 14:48:36

1

這將異步並行運行所有任務:

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback) 
{ 
    // since each task's callback would access this storage - we are using 
    // one of the concurrent queue 
    ConcurrentQueue<Bar> allResults = new ConcurrentQueue<Bar>(); 

    Task[] tasks = this.processors.Select(p => new Task(() => 
     { 
      IEnumerable<Bar> results = p.Provide(param); 
      foreach (var newItem in results) 
      { 
       allResults.Enqueue(newItem); 
      } 
     })).ToArray(); 

    foreach (var task in tasks) 
    { 
     task.Start(); 
    } 

    // 5 seconds to wait or inject a value into this method 
    Task.WaitAll(tasks, 5000);     
    callback(allResults); 
} 
1

爲什麼不使用類似下面的

// Allow for cancellation. 
CancellationTokenSource cancelSource = new CancellationTokenSource(); 
CancellationToken token = new CancellationToken(); 
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; 

List<Bar> allResults = new List<Bar>(); 
Task<List<Bar>> asyncTask = Task.Factory.StartNew<List<Bar>>(() => asyncMethod(token, atp), token); 

// Continuation is returned when the asyncMethod is complete. 
asyncTask.ContinueWith(task => 
{ 
    // Handle the result. 
    switch (task.Status) 
    { 
     // Handle any exceptions to prevent UnobservedTaskException.    
     case TaskStatus.RanToCompletion: 
      break; 
     case TaskStatus.Canceled: 
      break; 
     case TaskStatus.Faulted: 
    } 
} 

在你asyncMethod你可以做類似下面

private List<Bar> asyncMethod(CancellationToken token) 
{ 
    List<Bar> allResults = new List<Bar>(); 

    foreach(var processor in this.processors) 
    { 
     Task.Factory.StartNew<IEnumerable<Bar>>(() => 
     {  
      processor.Provide(param); 
     }, atp).ContinueWith(cont => { allResults.AddRange(cont.Result) }); 

     // Cancellation requested from UI Thread. 
     if (token.IsCancellationRequested) 
      token.ThrowIfCancellationRequested(); 
    } 
    return allResults; 
} 

溜溜的東西,然後可以從在第一個片段名爲task延續整體結果(List<Bar>)。你叫通過一些活動取消通過UI類似

// Cancellation requested from UI Thread. 
if (token.IsCancellationRequested) 
    token.ThrowIfCancellationRequested(); 

我無法測試這一點,但像上面應該工作。請參閱此great introduction to TPL欲瞭解更多信息和使用該類...

我希望這是有用的。