2012-01-13 57 views
21

我正在編寫一個C#程序,通過FTP生成並上傳50萬個文件。我想並行處理4個文件,因爲機器有4個內核,生成文件需要更長的時間。是否有可能將以下Powershell示例轉換爲C#?或者有沒有更好的框架,如C#中的Actor框架(如F#MailboxProcessor)?限制C#中並行線程的數量

Powershell example

$maxConcurrentJobs = 3; 

# Read the input and queue it up 
$jobInput = get-content .\input.txt 
$queue = [System.Collections.Queue]::Synchronized((New-Object System.Collections.Queue)) 
foreach($item in $jobInput) 
{ 
    $queue.Enqueue($item) 
} 

# Function that pops input off the queue and starts a job with it 
function RunJobFromQueue 
{ 
    if($queue.Count -gt 0) 
    { 
     $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() 
     Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null 
    } 
} 

# Start up to the max number of concurrent jobs 
# Each job will take care of running the rest 
for($i = 0; $i -lt $maxConcurrentJobs; $i++) 
{ 
    RunJobFromQueue 
} 

更新:
到遠程FTP服務器的連接可能會很慢,所以我想限制FTP上傳處理。

+0

如果要限制並行任務的數量,爲什麼不使用TPL? – 2012-01-13 16:34:05

+1

線程池應該足夠智能以便爲您處理此問題。爲什麼要自己管理它? – 2012-01-13 16:36:37

+3

您可以使用[PLINQ](http://msdn.microsoft.com/en-us/library/dd460688.aspx)並設置[WithDegreeOfParallelism](http://msdn.microsoft.com/en-us/library/ dd383719.aspx)。 – 2012-01-13 16:39:17

回答

5

如果您使用的是.NET 4.0中,您可以使用Parallel library

假如你使用的是Parallel Foreach for instance或者你可以have a look to PLinq 這裏一個comparison between the two

迭代通過量的一半百萬個文件,你可以「並行」的迭代
+0

請證明-1。 – 2012-01-13 16:41:46

+0

這個問題被標記爲C#-4.0,很明顯,他是擴展名和使用.NET 4的一員。單個句子不回答他的問題。 – 2012-01-13 16:44:58

+0

很明顯他使用的是C#4.0,但他並不清楚他熟悉Parallel庫,因此他不會問一個問題。另外,我的回覆也包含或多或少與另一個相同的信息。請請證明-1。 – 2012-01-13 16:51:08

16

任務並行庫是你的朋友在這裏。請參閱this鏈接,其中介紹了您可以使用的內容。基本上框架4,用它可優化這些基本後臺線程池線程到正在運行的機器上的處理器的數量。

也許沿着線的東西:

ParallelOptions options = new ParallelOptions(); 

options.MaxDegreeOfParallelism = 4; 

然後在你的循環是這樣的:

Parallel.Invoke(options, 
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"), 
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 
2

基本上你會想爲每個文件創建一個行動或任務上傳,將它們放入列表中,然後處理該列表,限制可並行處理的數量。

My blog post展示瞭如何都與任務,並以行動做到這一點,並提供您可以下載並運行同時看到在行動的示例項目。

隨着使用操作操作

如果,您可以使用內置的.Net Parallel.Invoke功能。這裏我們限制它並行運行最多4個線程。

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => UploadFile(localFile))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

雖然此選項不支持異步,但我假設您是FileUpload函數,因此您可能需要使用下面的Task示例。

帶任務

使用任務沒有內置功能。但是,您可以使用我在我的博客上提供的那個。

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

,然後創建您的任務列表,並調用函數讓他們跑,有說在同一時間最多4個同時的,你可以這樣做:

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await UploadFile(localFile))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

此外,由於這方法支持異步,它不會像使用Parallel.Invoke或Parallel.ForEach那樣阻塞UI線程。

0

我編寫了下面的技術,我使用BlockingCollection作爲線程管理器。實施和處理這項工作非常簡單。 它只是接受任務對象並向阻塞列表中添加一個整數值,將運行線程數增加1.當線程完成時,它將使對象出隊並釋放塊以用於即將到來的任務的添加操作。

 public class BlockingTaskQueue 
     { 
      private BlockingCollection<int> threadManager { get; set; } = null; 
      public bool IsWorking 
      { 
       get 
       { 
        return threadManager.Count > 0 ? true : false; 
       } 
      } 

      public BlockingTaskQueue(int maxThread) 
      { 
       threadManager = new BlockingCollection<int>(maxThread); 
      } 

      public async Task AddTask(Task task) 
      { 
       Task.Run(() => 
       { 
        Run(task); 
       }); 
      } 

      private bool Run(Task task) 
      { 
       try 
       { 
        threadManager.Add(1); 
        task.Start(); 
        task.Wait(); 
        return true; 

       } 
       catch (Exception ex) 
       { 
        return false; 
       } 
       finally 
       { 
        threadManager.Take(); 
       } 

      } 

     }