2011-10-17 36 views
0

我有一個相當大的XML文件(大約1-2GB)。生產者使用TPL的消費者模型,.NET 4.0中的任務

要求是將XML數據保存到數據庫中。目前這是通過3個步驟實現的。

  1. 閱讀用更少的內存佔用大文件儘可能
  2. 從XML數據
  3. 存儲從創建實體的數據使用SqlBulkCopy的數據庫中創建的實體。

爲了獲得更好的性能,我想創建一個生產者 - 消費者模型,其中生產者創建一組實體,說一批10K並將其添加到一個隊列中。消費者應該從隊列中取出一批實體,並使用sqlbulkcopy將其保存到數據庫中。

感謝, 戈庫爾

void Main() 
{ 
    int iCount = 0; 
    string fileName = @"C:\Data\CatalogIndex.xml"; 

    DateTime startTime = DateTime.Now; 
    Console.WriteLine("Start Time: {0}", startTime); 
    FileInfo fi = new FileInfo(fileName); 
    Console.WriteLine("File Size:{0} MB", fi.Length/1048576.0); 

/* I want to change this loop to create a producer consumer pattern here to process the data parallel-ly 
*/ 
    foreach (var element in StreamElements(fileName,"title")) 
      { 
       iCount++; 
      } 

      Console.WriteLine("Count: {0}", iCount); 
      Console.WriteLine("End Time: {0}, Time Taken:{1}", DateTime.Now, DateTime.Now - startTime); 
     } 

    private static IEnumerable<XElement> StreamElements(string fileName, string elementName) 
    { 
     using (var rdr = XmlReader.Create(fileName)) 
     { 
      rdr.MoveToContent(); 
      while (!rdr.EOF) 
      { 
       if ((rdr.NodeType == XmlNodeType.Element) && (rdr.Name == elementName)) 
       { 
        var e = XElement.ReadFrom(rdr) as XElement; 
        yield return e; 
       } 
       else 
       { 
        rdr.Read(); 
       } 
      } 
      rdr.Close(); 
     } 
    } 
+0

我嘗試使用線程本地存儲,但截至目前,我只用剛打電話來創建實體的方法的單線程方式,然後將它們同步持續。 – Gokul

回答

4

難道這就是你想幹什麼?

void Main() 
    { 
     const int inputCollectionBufferSize = 1024; 
     const int bulkInsertBufferCapacity = 100; 
     const int bulkInsertConcurrency = 4; 

     BlockingCollection<object> inputCollection = new BlockingCollection<object>(inputCollectionBufferSize); 

     Task loadTask = Task.Factory.StartNew(() => 
     { 
      foreach (object nextItem in ReadAllElements(...)) 
      { 
       // this will potentially block if there are already enough items 
       inputCollection.Add(nextItem); 
      } 

      // mark this collection as done 
      inputCollection.CompleteAdding(); 
     }); 

     Action parseAction =() => 
     { 
      List<object> bulkInsertBuffer = new List<object>(bulkInsertBufferCapacity); 

      foreach (object nextItem in inputCollection.GetConsumingEnumerable()) 
      { 
       if (bulkInsertBuffer.Length == bulkInsertBufferCapacity) 
       { 
        CommitBuffer(bulkInsertBuffer); 
        bulkInsertBuffer.Clear(); 
       } 

       bulkInsertBuffer.Add(nextItem); 
      } 
     }; 

     List<Task> parseTasks = new List<Task>(bulkInsertConcurrency); 

     for (int i = 0; i < bulkInsertConcurrency; i++) 
     { 
      parseTasks.Add(Task.Factory.StartNew(parseAction)); 
     } 

     // wait before exiting 
     loadTask.Wait(); 
     Task.WaitAll(parseTasks.ToArray()); 
    } 
+0

謝謝你的僞代碼。這是一個好的開始。我們如何讓CommitBuffer方法在X個線程中並行執行。由於以耗時的操作將數據保存到數據庫,我想將其作爲具有可配置線程數的多線程操作來執行。我會嘗試執行此代碼並在此處進行更新。 – Gokul

+0

@ user943141 - 這不會有太大的區別。如果你真的想,你可以多次啓動parseTasks。但是,數據庫操作是一項沉重的IO可靠操作。由於IO是瓶頸並且同步很重,您可能只會使用多線程減慢速度 – Polity

+0

請參閱本文http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis .aspx Alberto創建了一個PC模型來執行批量數據插入。在這種情況下,消費者是多線程的並執行sqlbulkcopy。 – Gokul