我有一種情況,即不斷生成新任務並將其添加到ConcurrentBag<Tasks>
。C#多線程,等待所有任務在新任務不斷添加的情況下完成
我需要等待所有任務完成。
通過WaitAll
等待ConcurrentBag
中的所有任務是不夠的,因爲在完成上一次等待時任務數量會增加。
此刻我等着它以下列方式:
private void WaitAllTasks()
{
while (true)
{
int countAtStart = _tasks.Count();
Task.WaitAll(_tasks.ToArray());
int countAtEnd = _tasks.Count();
if (countAtStart == countAtEnd)
{
break;
}
#if DEBUG
if (_tasks.Count() > 100)
{
tokenSource.Cancel();
break;
}
#endif
}
}
,我不是很高興與while(true)
解決方案。
任何人都可以提出一個更好更有效的方式來做到這一點(而不必用while(true)
不斷彙集處理器)
其他方面的信息,在意見中的要求。我不認爲這與這個問題有關。
這段代碼用於網絡爬蟲。爬蟲掃描頁面內容並查找兩種類型的信息。數據頁面和鏈接頁面。數據頁面將被掃描並收集數據,鏈接頁面將被掃描,更多的鏈接將從他們那裏收集。
由於每個任務都會執行活動並找到更多鏈接,因此會將鏈接添加到EventList
。列表(以下代碼)上有一個事件OnAdd
,用於觸發其他任務以掃描新添加的URL。等等。
當沒有更多正在運行的任務(因此不會再添加鏈接)並且所有項目都已處理完成時,作業完成。
public IEventList<ISearchStatus> CurrentLinks { get; private set; }
public IEventList<IDataStatus> CurrentData { get; private set; }
public IEventList<System.Dynamic.ExpandoObject> ResultData { get; set; }
private readonly ConcurrentBag<Task> _tasks = new ConcurrentBag<Task>();
private readonly CancellationTokenSource tokenSource = new CancellationTokenSource();
private readonly CancellationToken token;
public void Search(ISearchDefinition search)
{
CurrentLinks.OnAdd += UrlAdded;
CurrentData.OnAdd += DataUrlAdded;
var status = new SearchStatus(search);
CurrentLinks.Add(status);
WaitAllTasks();
_exporter.Export(ResultData as IList<System.Dynamic.ExpandoObject>);
}
private void DataUrlAdded(object o, EventArgs e)
{
var item = o as IDataStatus;
if (item == null)
{
return;
}
_tasks.Add(Task.Factory.StartNew(() => ProcessObjectSearch(item), token));
}
private void UrlAdded(object o, EventArgs e)
{
var item = o as ISearchStatus;
if (item==null)
{
return;
}
_tasks.Add(Task.Factory.StartNew(() => ProcessFollow(item), token));
_tasks.Add(Task.Factory.StartNew(() => ProcessData(item), token));
}
public class EventList<T> : List<T>, IEventList<T>
{
public EventHandler OnAdd { get; set; }
private readonly object locker = new object();
public new void Add(T item)
{
//lock (locker)
{
base.Add(item);
}
OnAdd?.Invoke(item, null);
}
public new bool Contains(T item)
{
//lock (locker)
{
return base.Contains(item);
}
}
}
我不確定你在做什麼,這可能不是你問題的最佳解決方案,但你應該看看TPL Dataflow。它允許您創建異步管道。 – john
爲什麼新任務不斷產生?你爲什麼把它們添加到'ConcurrentBag'中?爲什麼你需要等待所有任務完成? – PJvG
如果你想等待所有的任務完成,那麼爲什麼你的條件'countAtStart == countAtEnd'而不是'countAtEnd == 0'? – Servy