2011-04-14 76 views
1

我有一個要求,使用SqlBulkCopy將大型csv文件分成幾個不同的數據庫插入。我打算通過兩個單獨的任務來完成此任務,1個用於批量處理CSV文件,另一個用於插入數據庫。作爲一個例子,這裏是我的事情:具有容錯功能的並行生產者/消費者?

public class UberTask 
{ 
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>(); 

    public void PerformTask() 
    { 
     var notifier = new UINotifier(); 
     Task.Factory.StartNew(() => 
            { 
             for (int i =0; i < 10; i++) 
             { 
              string description = string.Format("Scenario {0}", i); 

              notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description))); 

              // represents reading the CSV file. 
              Thread.Sleep(500); 
              notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description))); 
              _store.Add(new Tuple<string, int>(description, i)); 
             } 
             _store.CompleteAdding(); 
            }); 

     var consumer = Task.Factory.StartNew(() => 
               { 
                foreach (var item in _store.GetConsumingEnumerable()) 
                { 
                 var poppedItem = item; 
                 notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1))); 
                 // represents sending stuff to the database. 
                 Thread.Sleep(1000); 
                } 
               }); 
     consumer.Wait(); 
     Console.WriteLine("complete"); 
    } 
} 

這是配對2套相關任務的好方法嗎?什麼上面的代碼不處理(它需要):

  • 如果表示CSV讀取故障的任務,其他任務需要停止
  • (即使仍然存在_Store項目。)如果表示數據庫的任務插入錯誤,則其他進程可以停止處理。
  • 如果配對任務中的任何一個出現故障,我將需要執行一些操作來回滾數據庫更新(我不擔心如何回滾),這更多的是如何編碼「發生故障配對任務之一,所以我需要做一些整理「。

以上任何幫助將不勝感激!

回答

2

您可以使用異常處理和取消令牌來執行此操作。當流水線階段檢測到錯誤時,它會捕獲它並設置令牌。這將取消其他階段。 finally塊確保完成對CompleteAdding()的調用。這很重要,因爲接收管道階段可能會在等待收集時被阻塞,並且在取消阻止之前不會處理取消。

您還希望顯示集合中任何未處理的對象,或者在您的案例中,在管道階段完成時(在finally中)和/或整個管道關閉時清理DB連接。

這裏的,這是否一個流水線階段的一個例子:

static void LoadPipelinedImages(IEnumerable<string> fileNames, 
            string sourceDir, 
            BlockingCollection<ImageInfo> original, 
            CancellationTokenSource cts) 
    { 
     // ... 
     var token = cts.Token; 
     ImageInfo info = null; 
     try 
     { 
      foreach (var fileName in fileNames) 
      { 
       if (token.IsCancellationRequested) 
        break; 
       info = LoadImage(fileName, ...); 
       original.Add(info, token); 
       info = null; 
      }     
     } 
     catch (Exception e) 
     { 
      // in case of exception, signal shutdown to other pipeline tasks 
      cts.Cancel(); 
      if (!(e is OperationCanceledException)) 
       throw; 
     } 
     finally 
     { 
      original.CompleteAdding(); 
      if (info != null) info.Dispose(); 
     } 
    } 

整體管道代碼如下所示。它還支持通過設置取消標記從外部(從UI)取消管道。

static void RunPipelined(IEnumerable<string> fileNames, 
          string sourceDir, 
          int queueLength, 
          Action<ImageInfo> displayFn, 
          CancellationTokenSource cts) 
    { 
     // Data pipes 
     var originalImages = new BlockingCollection<ImageInfo>(queueLength); 
     var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength); 
     var filteredImages = new BlockingCollection<ImageInfo>(queueLength); 
     try 
     { 
      var f = new TaskFactory(TaskCreationOptions.LongRunning, 
            TaskContinuationOptions.None); 
      // ... 

      // Start pipelined tasks 
      var loadTask = f.StartNew(() => 
        LoadPipelinedImages(fileNames, sourceDir, 
             originalImages, cts)); 

      var scaleTask = f.StartNew(() => 
        ScalePipelinedImages(originalImages, 
             thumbnailImages, cts)); 

      var filterTask = f.StartNew(() => 
        FilterPipelinedImages(thumbnailImages, 
             filteredImages, cts)); 

      var displayTask = f.StartNew(() => 
        DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
         ... cts)); 

      Task.WaitAll(loadTask, scaleTask, filterTask, displayTask); 
     } 
     finally 
     { 
      // in case of exception or cancellation, there might be bitmaps 
      // that need to be disposed. 
      DisposeImagesInQueue(originalImages); 
      DisposeImagesInQueue(thumbnailImages); 
      DisposeImagesInQueue(filteredImages);     
     } 
    } 

對於一個完整的示例看到這裏下載該管道例如:

http://parallelpatterns.codeplex.com/releases/view/50473

這裏討論:

http://msdn.microsoft.com/en-us/library/ff963548.aspx

+0

感謝您的詳細回答阿德! – primalgeek 2011-04-16 08:56:39