0

我有一種情況,即不斷生成新任務並將其添加到ConcurrentBag<Tasks>C#多線程,等待所有任務在新任務不斷添加的情況下完成

我需要等待所有任務完成。

通過WaitAll等待ConcurrentBag中的所有任務是不夠的,因爲在完成上一次等待時任務數量會增加。

此刻我等着它以下列方式:

private void WaitAllTasks() 
{ 
    while (true) 
    { 
     int countAtStart = _tasks.Count(); 
     Task.WaitAll(_tasks.ToArray()); 

     int countAtEnd = _tasks.Count(); 
     if (countAtStart == countAtEnd) 
     { 
      break; 
     } 

     #if DEBUG 
     if (_tasks.Count() > 100) 
     { 
      tokenSource.Cancel(); 
      break; 
     } 
     #endif 
    } 
} 

,我不是很高興與while(true)解決方案。

任何人都可以提出一個更好更有效的方式來做到這一點(而不必用while(true)不斷彙集處理器)


其他方面的信息,在意見中的要求。我不認爲這與這個問題有關。

這段代碼用於網絡爬蟲。爬蟲掃描頁面內容並查找兩種類型的信息。數據頁面和鏈接頁面。數據頁面將被掃描並收集數據,鏈接頁面將被掃描,更多的鏈接將從他們那裏收集。

由於每個任務都會執行活動並找到更多鏈接,因此會將鏈接添加到EventList。列表(以下代碼)上有一個事件OnAdd,用於觸發其他任務以掃描新添加的URL。等等。

當沒有更多正在運行的任務(因此不會再添加鏈接)並且所有項目都已處理完成時,作業完成。

public IEventList<ISearchStatus> CurrentLinks { get; private set; } 
public IEventList<IDataStatus> CurrentData { get; private set; } 
public IEventList<System.Dynamic.ExpandoObject> ResultData { get; set; } 
private readonly ConcurrentBag<Task> _tasks = new ConcurrentBag<Task>(); 

private readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); 
private readonly CancellationToken token; 

public void Search(ISearchDefinition search) 
{ 
    CurrentLinks.OnAdd += UrlAdded; 
    CurrentData.OnAdd += DataUrlAdded; 

    var status = new SearchStatus(search); 

    CurrentLinks.Add(status); 

    WaitAllTasks(); 

    _exporter.Export(ResultData as IList<System.Dynamic.ExpandoObject>); 
} 

private void DataUrlAdded(object o, EventArgs e) 
{ 
    var item = o as IDataStatus; 
    if (item == null) 
    { 
     return; 
    } 

    _tasks.Add(Task.Factory.StartNew(() => ProcessObjectSearch(item), token)); 
} 

private void UrlAdded(object o, EventArgs e) 
{ 
    var item = o as ISearchStatus; 
    if (item==null) 
    { 
     return; 
    } 

    _tasks.Add(Task.Factory.StartNew(() => ProcessFollow(item), token)); 
    _tasks.Add(Task.Factory.StartNew(() => ProcessData(item), token)); 
} 

public class EventList<T> : List<T>, IEventList<T> 
{ 
    public EventHandler OnAdd { get; set; } 
    private readonly object locker = new object(); 
    public new void Add(T item) 
    { 
     //lock (locker) 
     { 
      base.Add(item); 
     } 
     OnAdd?.Invoke(item, null); 
    } 

    public new bool Contains(T item) 
    { 
     //lock (locker) 
     { 
      return base.Contains(item); 
     } 
    } 
} 
+5

我不確定你在做什麼,這可能不是你問題的最佳解決方案,但你應該看看TPL Dataflow。它允許您創建異步管道。 – john

+0

爲什麼新任務不斷產生?你爲什麼把它們添加到'ConcurrentBag'中?爲什麼你需要等待所有任務完成? – PJvG

+0

如果你想等待所有的任務完成,那麼爲什麼你的條件'countAtStart == countAtEnd'而不是'countAtEnd == 0'? – Servy

回答

0

爲什麼不編寫一個函數,在創建時根據需要生成您的任務?這樣你就可以使用Task.WhenAll等待他們完成,或者,我錯過了這一點? See this working here

using System; 
using System.Threading.Tasks; 
using System.Collections.Generic; 

public class Program 
{ 
    public static void Main() 
    { 
     try 
     { 
      Task.WhenAll(GetLazilyGeneratedSequenceOfTasks()).Wait(); 
      Console.WriteLine("Fisnished."); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex); 
     } 
    } 

    public static IEnumerable<Task> GetLazilyGeneratedSequenceOfTasks() 
    { 
     var random = new Random(); 
     var finished = false; 
     while (!finished) 
     { 
      var n = random.Next(1, 2001); 
      if (n < 50) 
      { 
       finished = true; 
      } 

      if (n > 499) 
      { 
       yield return Task.Delay(n); 
      } 

      Task.Delay(20).Wait();    
     } 

     yield break; 
    } 
} 

另外,如果你的問題是不是微不足道的我的答案可能暗示,我會考慮用TPL Dataflow的網格。 BufferBlockActionBlock的組合會讓你非常接近你所需要的。你可以start here


無論哪種方式,我建議你要包括接受CancellationToken或兩個的規定。

+0

我認爲你提供的代碼將不起作用,因爲可能會添加一些額外的任務,並且枚舉將拋出。 – VMAtm

+0

@VMAtm我的想法是,在想要完成序列之前不要'突破'。然而,正如我所說的,對於任何非平凡的解決方案來說,調查TPL Dataflow。 – Jodrell

+0

這不起作用,因爲任務列表隨產量發生而被修改。 –

0

我認爲這項任務可以用TPL Dataflow庫進行非常基本的設置。你需要一個TransformManyBlock<Task, IEnumerable<DataTask>>ActionBlock(可能是更多的人),實際數據處理,像這樣:

// queue for a new urls to parse 
var buffer = new BufferBlock<ParseTask>(); 

// parser itself, returns many data tasks from one url 
// similar to LINQ.SelectMany method 
var transform = new TransformManyBlock<ParseTask, DataTask>(task => 
{ 
    // get all the additional urls to parse 
    var parsedLinks = GetLinkTasks(task); 
    // get all the data to parse 
    var parsedData = GetDataTasks(task); 

    // setup additional links to be parsed 
    foreach (var parsedLink in parsedLinks) 
    { 
     buffer.Post(parsedLink); 
    } 

    // return all the data to be processed 
    return parsedData; 
}); 

// actual data processing 
var consumer = new ActionBlock<DataTask>(s => ProcessData(s)); 

之後,你需要在每個之間的塊鏈接:

buffer.LinkTo(transform, new DataflowLinkOptions { PropagateCompletion = true }); 
transform.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true }); 

現在你有一個很好的管道,它將在後臺執行。在你意識到你所需要的每件事都被解析的時候,你只需要調用Complete方法來阻止接受新聞消息。經過buffer成了空的,它將在完成向下的管道傳播到transform塊,將它傳播到消費者(S),你需要等待Completion任務:

// no additional links would be accepted 
buffer.Complete(); 
// after all the tasks are done, this will get fired 
await consumer.Completion; 

您可以檢查的那一刻爲完成,例如,如果兩個bufferCount屬性transformInputCounttransform'CurrentDegreeOfParallelism(這是爲TransformManyBlock內部屬性)等於0

但是,我建議你在這裏實現一些額外的邏輯來確定當前的變壓器數量,因爲使用內部邏輯並不是一個好的解決方案。至於取消流水線,您可以創建一個TPL塊,其中包含CancellationToken,可以是所有塊中的一個,也可以是每個塊專用的塊,從中獲取取消。