2016-07-25 58 views
4

我已經通過Insert 2 million rows into SQL Server quickly鏈接,發現我可以通過使用批量插入來完成此操作。所以我試圖創建數據表(如下面的代碼),而是因爲這是一個巨大的文件(超過300K行)我得到在我的代碼的OutOfMemoryEexception如何從文本文件中讀取數百萬行並快速插入表格

string line; 
DataTable data = new DataTable(); 
string[] columns = null;  
bool isInserted = false;   

using (TextReader tr = new StreamReader(_fileName, Encoding.Default)) 
{ 
    if (columns == null) 
    { 
     line = tr.ReadLine(); 
     columns = line.Split(','); 
    } 

    for (int iColCount = 0; iColCount < columns.Count(); iColCount++) 
    { 
     data.Columns.Add("Column" + iColCount, typeof(string)); 
    }      

    string[] columnVal; 

    while ((line = tr.ReadLine()) != null) 
    { 
     columnVal = line.Split(','); // OutOfMemoryException throwing in this line 
     data.Rows.Add(columnVal); 
    } 
} 

經過長期的工作,我修改了我的代碼爲以下,但隨後還我收到OutOfMemoryException異常在添加行的時間到數據表

DataTable data = new DataTable(); 
string[] columns = null; 
var line = string.Empty; 
using (TextReader tr = new StreamReader(_fileName, Encoding.Default)) 
{ 
    if (columns == null) 
    { 
     line = tr.ReadLine(); 
     columns = line.Split(','); 
    } 

    for (int iColCount = 0; iColCount < columns.Count(); iColCount++) 
    { 
     data.Columns.Add("Column" + iColCount, typeof(string)); 
    } 
    } 

    // Split the rows in 20000 rows in different list 

    var _fileList = File.ReadLines(_fileName, Encoding.Default).ToList(); 
    var splitChunks = new List<List<string>>(); 
    splitChunks = SplitFile(_fileList, 20000); 

Parallel.ForEach(splitChunks, lstChunks => 
{ 
    foreach (var rows in lstChunks) 
    { 
    string[] lineFields = rows.Split(','); 
    DataRow row = datatbl.NewRow(); 
    for (int iCount = 0; iCount < lineFields.Count(); iCount++) 
    { 
     row[iCount] = lineFields[iCount] == string.Empty ? "" : lineFields[iCount].ToString(); 
    } 
    datatbl.Rows.Add(row); 
    } 
}); 

我能爲下一級爲下面的代碼做批量插入:

SqlConnection SqlConnectionObj = GetSQLConnection(); 
SqlBulkCopy bulkCopy = new SqlBulkCopy(SqlConnectionObj, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null); 
bulkCopy.DestinationTableName = "TempTable"; 
bulkCopy.WriteToServer(data); 

文件包含以下類型的數據

4714,1370的,AUSRICHTEN MASCHINELL

4870,1370,PLATTE STECKEN

0153,1900,填縫槍

0154,1900,新的終結者

0360,1470,MU 186 MACCH。 X LAV。 S/A ASTE PS174

9113-H22,1970,MC鑽頭

代碼需要此轉換成6行和3列。

有沒有更快的方法來實現上述功能來讀取文件並創建用於批量插入的數據表?所以我不應該從索引異常中獲取內存。

在此先感謝。

+0

這是否需要以編程方式完成?如果這只是一個關閉,你可以使用SSMS工具 – Mark

+0

你可以分區嗎?像讀取數千行,批量插入這些數據,然後重新使用數據表爲接下來的數千行等等。 – dlatikay

+0

是的,它需要,因爲我有更多的代碼來根據env從不同的服務器獲取文件。 (DEV,TST,PROD)。並有更多的功能。 – Rocky

回答

3

您得到OutOfMemoryException的原因是因爲你是 創建一個內存中的數據表,並嘗試插入300K行 進去

這是一個很大的數據放在內存。

取而代之,您應該做的是從文本文件中讀取的每一定數量的行 - 您需要將其插入到數據庫中。

如何做到這一點取決於您,您可以使用SQL或批量複製 - 但請記住,您無法讀取整個文本文件並將其保存在內存中,因此請以塊爲單位。

+0

看來你還在將所有數據加載到一個數據表對象中,如果我錯了,請糾正我,因爲我找不到你正在實例化datatbl。要麼在每個foreach事件中創建一個新的數據表,要麼在每次插入後刪除parallel.foreach並清除表數據。 – gilmishal

+0

就我個人而言,我發現使用SQL查詢爲了將數據插入表中可以更簡單快捷 - 它也可能更快,因爲您正在跳過使用數據表對象 - 但您的方式也可以工作。 – gilmishal

0

我發現忘記了一個DataTable,並且逐行使用普通的舊SQLClient的速度更快。也更簡單。這也擊敗了流式SQL函數,該函數被認爲是在SQL Server數據庫中獲取數據的最快方式。

試試看,並測量速度,看看它是否足夠快爲你。如果不是的話,你總是可以嘗試重新格式化文件(如果需要的話),讓SQL Server使用它的Bulk Insert爲你做工作。

1

使用SqlBulkCopy.WriteToServerIDataReader的解決方案。我正在使用CSV,但我希望能夠輕鬆修改其他類型。 SqlBulkCopyIDateReader只使用了3件事,我們必須實現它們:

  • public int FieldCount {get; }
  • public bool Read()
  • public object GetValue(int i)

所有其他屬性和方法可以是未實現的。有趣的文章約SqlBulkCopy。完整代碼:https://dotnetfiddle.net/giG3Ai。這裏是切割版本:

namespace SqlBulkCopy 
{ 
    using System; 
    using System.Collections.Generic; 
    using System.IO; 
    using System.Diagnostics; 
    using System.Data; 
    using System.Data.SqlClient; 

    public class CsvReader : IDataReader 
    { 
     private readonly char CSV_DELIMITER = ','; 

     private readonly StreamReader _sr; 
     private readonly Dictionary<string, Func<string, object>> _csv2SqlType; 
     private readonly string[] _headers; 

     private string _line; 
     private string[] _values; 

     public int FieldCount { get { return _headers.Length; } } 

     public CsvReader(string filePath, Dictionary<string, Func<string, object>> csvColumn2SqlTypeDict) 
     { 
      if (string.IsNullOrEmpty(filePath)) 
       throw new ArgumentException("is null or empty", "filePath"); 
      if (!System.IO.File.Exists(filePath)) 
       throw new IOException(string.Format("{0} doesn't exist or access denied", filePath)); 
      if (csvColumn2SqlTypeDict == null) 
       throw new ArgumentNullException("csvColumn2SqlTypeDict"); 

      _sr = new StreamReader(filePath); 
      _csv2SqlType = csvColumn2SqlTypeDict; 
      _headers = ReadHeaders(); 
      ValidateHeaders(); 
     } 
     public object GetValue(int i) 
     { 
      // Get column value 
      var colValue = _values[i]; 
      // Get column name 
      var colName = _headers[i]; 
      // Try to convert to SQL type 
      try { return _csv2SqlType[colName](colValue); } 
      catch { return null; } 
     } 
     public bool Read() 
     { 
      if (_sr.EndOfStream) return false; 

      _line = _sr.ReadLine(); 
      _values = _line.Split(CSV_DELIMITER); 
      // If row is invalid, go to next row 
      if (_values.Length != _headers.Length) 
       return Read(); 
      return true; 
     } 
     public void Dispose() 
     { 
      _sr.Dispose(); 
     } 
     private void ValidateHeaders() 
     { 
      if (_headers.Length != _csv2SqlType.Keys.Count) 
       throw new InvalidOperationException(string.Format("Read {0} columns, but csv2SqlTypeDict contains {1} columns", _headers.Length, _csv2SqlType.Keys)); 
      foreach (var column in _headers) 
      { 
       if (!_csv2SqlType.ContainsKey(column)) 
        throw new InvalidOperationException(string.Format("There is no convertor for column '{0}'", column)); 
      } 
     } 
     private string[] ReadHeaders() 
     { 
      var headerLine = _sr.ReadLine(); 
      if (string.IsNullOrEmpty(headerLine)) 
       throw new InvalidDataException("There is no header in CSV!"); 
      var headers = headerLine.Split(CSV_DELIMITER); 
      if (headers.Length == 0) 
       throw new InvalidDataException("There is no header in CSV after Split!"); 
      return headers; 
     } 
    } 
    public class Program 
    {   
     public static void Main(string[] args) 
     { 
      // Converter from CSV columns to SQL columns 
      var csvColumn2SqlTypeDict = new Dictionary<string, Func<string, object>> 
      { 
       { "int", (s) => Convert.ToInt32(s) }, 
       { "str", (s) => s }, 
       { "double", (s) => Convert.ToDouble(s) }, 
       { "date", (s) => Convert.ToDateTime(s) }, 
      }; 
      Stopwatch sw = Stopwatch.StartNew(); 
      try 
      { 
       // example.csv 
       /*** 
        int,str,double,date 
        1,abcd,2.5,15.04.2002 
        2,dab,2.7,15.04.2007 
        3,daqqb,4.7,14.04.2007 
       ***/ 
       using (var csvReader = new CsvReader("example.csv", csvColumn2SqlTypeDict)) 
       { 
        // TODO!!! Modify to your Connection string 
        var cs = @"Server=localhost\SQLEXPRESS;initial catalog=TestDb;Integrated Security=true"; 
        using (var loader = new SqlBulkCopy(cs, SqlBulkCopyOptions.Default)) 
        { 
         // TODO Modify to your Destination table 
         loader.DestinationTableName = "Test"; 
         // Write from csvReader to database 
         loader.WriteToServer(csvReader); 
        } 
       } 
      } 
      catch(Exception ex) 
      { 
       Console.WriteLine("Got an exception: {0}", ex); 
       Console.WriteLine("Press 'Enter' to quit"); 
       Console.ReadLine(); 
       return; 
      } 
      finally { sw.Stop(); } 
      Console.WriteLine("Data has been written in {0}", sw.Elapsed); 
      Console.WriteLine("Press 'Enter' to quit"); 
      Console.ReadLine(); 
     } 
     private static void ShowCsv(IDataReader dr) 
     { 
      int i = 0; 
      while (dr.Read()) 
      { 
       Console.WriteLine("Row# {0}", i); 
       for (int j = 0; j < dr.FieldCount; j++) 
       { 
        Console.WriteLine("{0} => {1}", j, dr.GetValue(j)); 
       } 
       i++; 
      } 
     } 
    } 
} 
相關問題