0
我有一個ImageProcessor類,它爲這些任務中的每個圖像提供程序創建一個任務列表,然後爲每個圖像運行一個parallel.foreach循環提供者,我想能夠從控制檯取消所有任務和嵌套並行循環,我已經找到了如何取消任務以及如何取消並行循環的示例,但我不確定如何爲嵌套進程執行此操作。如何從任務中取消parallel.foreach循環
下面是我目前所面對的
從我的控制檯應用程序的代碼:
using (IUnityContainer container = Bootstrapper.Initialise())
{
IImageProcessor processor = container.Resolve<IImageProcessor>();
processor.Container = container;
try
{
InputArguments arguments = new InputArguments(args);
if (arguments.Contains("fs"))
{
processor.Initialise(arguments["fs"]);
}
else
{
processor.Initialise();
}
processor.Run();
Console.WriteLine("\r\n\n\nPress any key to Exit");
Console.ReadLine();
return (int)ExitCode.Success;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("\r\n\n\nPress any key to Exit");
Console.ReadLine();
return (int)ExitCode.UnknownError;
}
}
Run方法
public void Run()
{
List<Task> tasks = new List<Task>();
foreach (IFileService fileservice in this.fileServices)
{
Task task = Task.Factory.StartNew((arg) =>
{
IFileService fs = (IFileService)arg;
string msg = $"Processing {fs.ToString()}...";
FileLogger.Write(msg, fs.ToString());
ConsoleLogger.WriteLine(msg);
fs.ProcessFiles();
//fileservice.ReprocessUnMatchedData();
}, fileservice);
tasks.Add(task);
}
Task.WaitAll(tasks.ToArray());
}
每個文件服務,我有調用這個方法裏面:
protected bool ProcessFileRecord<T>() where T : IDataRecord
{
int matched = 0;
int notMatched = 0;
int skipped = 0;
bool result;
object lockObject = new object();
try
{
processTracker = GetTracker();
if (databaseHelper.TrackerFullyProcessed(processTracker))
{
LoggingService.Write("File already processed... Skipping.", LoggingTarget.All, serviceName);
result = true;
}
LoggingService.Write($"\r\nProcessing index file {fileRecord.IndexFileName}", LoggingTarget.File, serviceName);
Parallel.ForEach(
fileRecord.DataRecords,
new ParallelOptions() { MaxDegreeOfParallelism = Thread.CurrentThread.ManagedThreadId },
(item) =>
{
switch ((RecordProcessResult)ProcessRecord(item))
{
case RecordProcessResult.Invalid:
break;
case RecordProcessResult.Matched:
Increment(ref matched);
break;
case RecordProcessResult.NotMatched:
Increment(ref notMatched);
break;
case RecordProcessResult.Skipped:
Increment(ref skipped);
break;
default:
break;
}
lock (lockObject)
{
if ((matched + notMatched + skipped) % 100 == 0)
{
LoggingService.Write($"\tMatched: {matched}\tNot Matched: {notMatched}\tSkipped: {skipped}\t total: {matched + notMatched + skipped}", LoggingTarget.Trace & LoggingTarget.Console, serviceName);
}
}
});
LoggingService.Write($"Total Lines: {matched + notMatched + skipped} \r\nMatched: {matched} \r\nNot Matched: {notMatched} \r\nSkipped: {skipped}", LoggingTarget.All, serviceName);
this.databaseHelper.UpdateTracker(this.processTracker, matched, notMatched);
result = true;
}
catch (Exception ex)
{
LoggingService.Write($"Error processing data file:{fileRecord.IndexFileName}", LoggingTarget.All, serviceName);
LoggingService.Write($"{ex.ExceptionTreeAsString()}", LoggingTarget.All, serviceName);
LoggingService.Write($"Total Lines: {(matched + notMatched + skipped)} \r\nMatched: {matched} \r\nNot Matched: {notMatched} \r\nSkipped: {skipped}", LoggingTarget.All, serviceName);
this.databaseHelper.UpdateTracker(this.processTracker, matched, notMatched);
result = false;
}
return result;
}
謝謝。