2011-07-06 112 views

回答

-9

你可以對你的記錄做一個foreach。

foreach (var record in records) { 
    db.Save(record); 
} 
+1

這是否只創建一個數據庫命中? – Dan

+0

不是每個記錄都會觸發一次數據庫。你如何期望在一次數據庫命中時做到這一點?除非你只是想生成一個更新語句並執行它。 – Schotime

11

我嘗試兩種不同的方法插入行大量比默認插入(當你有很多行,這是相當慢的)速度更快。

1)彌補列表<Ť>與POCO的第一,然後在循環中在一次將它們插入(和在一個事務):

using (var tr = PetaPocoDb.GetTransaction()) 
{ 
    foreach (var record in listOfRecords) 
    { 
     PetaPocoDb.Insert(record); 
    } 
    tr.Complete(); 
} 

2)SqlBulkCopy的數據表:

var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock); 
bulkCopy.DestinationTableName = "SomeTable"; 
bulkCopy.WriteToServer(dt); 

要將我的清單<T>轉換爲DataTable我使用了Marc Gravells Convert generic List/Enumerable to DataTable?函數,它爲我工作ootb(在我重新安排Poco屬性的順序與表fi相同)

SqlBulkCopy是最快的,比我在〜1000行的(快速)性能測試中的事務處理方法快了50%。

H個

+0

我認爲你的第一個方法仍然會進入每個插入的數據庫 – IvoTops

+0

是的,將速度與組合插入tsql進行比較會很有趣。在我的情況下,當我注意到我只比散裝拷貝慢了50%時,我停止挖掘更多的perf。 – joeriks

11

在我的情況,我把database.Execute()方法的優點。

我創建了我的插件的第一部分中的SQL參數:

var sql = new Sql("insert into myTable(Name, Age, Gender) values"); 

for (int i = 0; i < pocos.Count ; ++i) 
{ 
    var p = pocos[i]; 
    sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender); 
    if(i != pocos.Count -1) 
    sql.Append(","); 
} 

Database.Execute(sql); 
+0

好的解決方法! – Zelid

+1

它在PetaPoco「Sql.Append()」方法中生成StackOverflow(如果列表中的項目很多)。 – Zelid

+0

我發佈了我自己的解決方案。 – Zelid

7

插入一個SQL查詢快得多

這裏是一個客戶方法PetaPoco.Database類,增加了做任何集合的批量插入能力:

public void BulkInsertRecords<T>(IEnumerable<T> collection) 
     { 
      try 
      { 
       OpenSharedConnection(); 
       using (var cmd = CreateCommand(_sharedConnection, "")) 
       { 
        var pd = Database.PocoData.ForType(typeof(T)); 
        var tableName = EscapeTableName(pd.TableInfo.TableName); 
        string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray()); 
        var pocoValues = new List<string>(); 
        var index = 0; 
        foreach (var poco in collection) 
        { 
         var values = new List<string>(); 
         foreach (var i in pd.Columns) 
         { 
          values.Add(string.Format("{0}{1}", _paramPrefix, index++)); 
          AddParam(cmd, i.Value.GetValue(poco), _paramPrefix); 
         } 
         pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")"); 
        } 
        var sql = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues)); 
        cmd.CommandText = sql; 
        cmd.ExecuteNonQuery(); 
       } 
      } 
      finally 
      { 
       CloseSharedConnection(); 
      } 
     } 
+0

非常好,將perfs與我描述的事務方法進行比較會很有趣(我相信你的速度更快,但是多少?)另外 - afaik - 如果將事務包裝插入事務中,您應該獲得額外的性能(http ://blog.staticvoid.co.nz/2012/4/26/making_dapper_faster_with_transactions) – joeriks

2

這裏是BulkInsert的代碼,您可以添加到V5.01 PetaPoco.cs

您可以粘貼的地方關閉定時刀片上線1098

你給它波蘇斯的IEnumerable,它會發送到數據庫

分批一起。代碼是從普通插入90%。

我沒有性能對比,讓我知道:)

/// <summary> 
    /// Bulk inserts multiple rows to SQL 
    /// </summary> 
    /// <param name="tableName">The name of the table to insert into</param> 
    /// <param name="primaryKeyName">The name of the primary key column of the table</param> 
    /// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param> 
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param> 
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>   
    public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25) 
    { 
     try 
     { 
      OpenSharedConnection(); 
      try 
      { 
       using (var cmd = CreateCommand(_sharedConnection, "")) 
       { 
        var pd = PocoData.ForObject(pocos.First(), primaryKeyName); 
        // Create list of columnnames only once 
        var names = new List<string>(); 
        foreach (var i in pd.Columns) 
        { 
         // Don't insert result columns 
         if (i.Value.ResultColumn) 
          continue; 

         // Don't insert the primary key (except under oracle where we need bring in the next sequence value) 
         if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0) 
         { 
          // Setup auto increment expression 
          string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo); 
          if (autoIncExpression != null) 
          { 
           names.Add(i.Key); 
          } 
          continue; 
         } 
         names.Add(_dbType.EscapeSqlIdentifier(i.Key)); 
        } 
        var namesArray = names.ToArray(); 

        var values = new List<string>(); 
        int count = 0; 
        do 
        { 
         cmd.CommandText = ""; 
         cmd.Parameters.Clear(); 
         var index = 0; 
         foreach (var poco in pocos.Skip(count).Take(batchSize)) 
         { 
          values.Clear(); 
          foreach (var i in pd.Columns) 
          { 
           // Don't insert result columns 
           if (i.Value.ResultColumn) continue; 

           // Don't insert the primary key (except under oracle where we need bring in the next sequence value) 
           if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0) 
           { 
            // Setup auto increment expression 
            string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo); 
            if (autoIncExpression != null) 
            { 
             values.Add(autoIncExpression); 
            } 
            continue; 
           } 

           values.Add(string.Format("{0}{1}", _paramPrefix, index++)); 
           AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo); 
          } 

          string outputClause = String.Empty; 
          if (autoIncrement) 
          { 
           outputClause = _dbType.GetInsertOutputClause(primaryKeyName); 
          } 

          cmd.CommandText += string.Format("INSERT INTO {0} ({1}){2} VALUES ({3})", _dbType.EscapeTableName(tableName), 
                  string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray())); 
         } 
         // Are we done? 
         if (cmd.CommandText == "") break; 
         count += batchSize; 
         DoPreExecute(cmd); 
         cmd.ExecuteNonQuery(); 
         OnExecutedCommand(cmd); 
        } 
        while (true); 

       } 
      } 
      finally 
      { 
       CloseSharedConnection(); 
      } 
     } 
     catch (Exception x) 
     { 
      if (OnException(x)) 
       throw; 
     } 
    } 


    /// <summary> 
    /// Performs a SQL Bulk Insert 
    /// </summary> 
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>   
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>   
    public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25) 
    { 
     if (!pocos.Any()) return; 
     var pd = PocoData.ForType(pocos.First().GetType()); 
     BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos); 
    } 
3

下面是PetaPoco的BulkInsert方法上taylonr的非常聰明的想法通過INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n)使用插入多行的SQL技術的擴展。

它還返回插入記錄的自動增量(標識)值,我不相信會發生在IvoTops的實現中。

注意: SQL Server 2012(及以下版本)每個查詢限制2,100個參數。 (這可能是Zelid評論引用的堆棧溢出異常的來源)。您需要根據未裝飾爲IgnoreResult的列數手動拆分批次。例如,21列的POCO應以99或者(2100 - 1)/21的批量發送。我可能會重構這個動態分割基於此SQL Server的限制的批次;但是,通過管理此方法外部的批處理大小,您將始終可以看到最佳結果。

與我以前在單個事務中爲所有插入使用共享連接的技術相比,此方法的執行時間顯示出大約50%的增益。

這是Massive真​​正閃耀的一個領域 - Massive有一個Save(params對象[]事物),它構建一個IDbCommand數組,並在共享連接上執行每一個。它可以直接使用,並且不會遇到參數限制。

/// <summary> 
/// Performs an SQL Insert against a collection of pocos 
/// </summary> 
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted. Assumes that every POCO is of the same type.</param> 
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns> 
/// <remarks> 
///  NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query. This limitation does not seem to apply on other platforms, so 
///   this method will allow more than 2100 parameters. See http://msdn.microsoft.com/en-us/library/ms143432.aspx 
///  The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection 
/// </remarks> 
public object[] BulkInsert(IEnumerable<object> pocos) 
{ 
    Sql sql; 
    IList<PocoColumn> columns = new List<PocoColumn>(); 
    IList<object> parameters; 
    IList<object> inserted; 
    PocoData pd; 
    Type primaryKeyType; 
    object template; 
    string commandText; 
    string tableName; 
    string primaryKeyName; 
    bool autoIncrement; 


    if (null == pocos) 
     return new object[] {}; 

    template = pocos.First<object>(); 

    if (null == template) 
     return null; 

    pd = PocoData.ForType(template.GetType()); 
    tableName = pd.TableInfo.TableName; 
    primaryKeyName = pd.TableInfo.PrimaryKey; 
    autoIncrement = pd.TableInfo.AutoIncrement; 

    try 
    { 
     OpenSharedConnection(); 
     try 
     { 
      var names = new List<string>(); 
      var values = new List<string>(); 
      var index = 0; 
      foreach (var i in pd.Columns) 
      { 
       // Don't insert result columns 
       if (i.Value.ResultColumn) 
        continue; 

       // Don't insert the primary key (except under oracle where we need bring in the next sequence value) 
       if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0) 
       { 
        primaryKeyType = i.Value.PropertyInfo.PropertyType; 

        // Setup auto increment expression 
        string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo); 
        if (autoIncExpression != null) 
        { 
         names.Add(i.Key); 
         values.Add(autoIncExpression); 
        } 
        continue; 
       } 

       names.Add(_dbType.EscapeSqlIdentifier(i.Key)); 
       values.Add(string.Format("{0}{1}", _paramPrefix, index++)); 
       columns.Add(i.Value); 
      } 

      string outputClause = String.Empty; 
      if (autoIncrement) 
      { 
       outputClause = _dbType.GetInsertOutputClause(primaryKeyName); 
      } 

      commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES", 
          _dbType.EscapeTableName(tableName), 
          string.Join(",", names.ToArray()), 
          outputClause 
          ); 

      sql = new Sql(commandText); 
      parameters = new List<object>(); 
      string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")"); 
      bool isFirstPoco = true; 

      foreach (object poco in pocos) 
      { 
       parameters.Clear(); 
       foreach (PocoColumn column in columns) 
       { 
        parameters.Add(column.GetValue(poco)); 
       } 

       sql.Append(valuesText, parameters.ToArray<object>()); 

       if (isFirstPoco) 
       { 
        valuesText = "," + valuesText; 
        isFirstPoco = false; 
       } 
      } 

      inserted = new List<object>(); 

      using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments)) 
      { 
       if (!autoIncrement) 
       { 
        DoPreExecute(cmd); 
        cmd.ExecuteNonQuery(); 
        OnExecutedCommand(cmd); 

        PocoColumn pkColumn; 
        if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn)) 
        { 
         foreach (object poco in pocos) 
         { 
          inserted.Add(pkColumn.GetValue(poco)); 
         } 
        } 

        return inserted.ToArray<object>(); 
       } 

       // BUG: the following line reportedly causes duplicate inserts; need to confirm 
       //object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName); 

       using(var reader = cmd.ExecuteReader()) 
       { 
        while (reader.Read()) 
        { 
         inserted.Add(reader[0]); 
        } 
       } 

       object[] primaryKeys = inserted.ToArray<object>(); 

       // Assign the ID back to the primary key property 
       if (primaryKeyName != null) 
       { 
        PocoColumn pc; 
        if (pd.Columns.TryGetValue(primaryKeyName, out pc)) 
        { 
         index = 0; 
         foreach(object poco in pocos) 
         { 
          pc.SetValue(poco, pc.ChangeType(primaryKeys[index])); 
          index++; 
         } 
        } 
       } 

       return primaryKeys; 
      } 
     } 
     finally 
     { 
      CloseSharedConnection(); 
     } 
    } 
    catch (Exception x) 
    { 
     if (OnException(x)) 
      throw; 
     return null; 
    } 
} 
+0

[Another answer](http://stackoverflow.com/a/17262788/38360)指出了自動增量路徑中的一個嚴重錯誤在這裏導致每個記錄都被插入兩次 - 但這種做法在整個過程中造成了一切。我需要做的只是刪除以下行:'object id = _dbType.ExecuteInsert(this,cmd,primaryKeyName);'。我會繼續編輯您的帖子,但是您應該檢查並確保可以安全地刪除該行。 – Aaronaught

+0

嗨@Aaronaught,謝謝你的擡頭。我已經註釋掉了這一行,直到我可以訪問帶有VS的Windows盒子進行測試。這個bug並沒有讓我感到意外,因爲我創建這個代碼的db對於重複插入是相當寬容的。 –

3

這裏是史蒂夫·詹森的答案是,在最大2100個羊駝

我註釋掉,因爲它產生重複數據庫下面的代碼的chuncs分裂的更新verision ...

   //using (var reader = cmd.ExecuteReader()) 
       //{ 
       // while (reader.Read()) 
       // { 
       //  inserted.Add(reader[0]); 
       // } 
       //} 

更新的代碼

/// <summary> 
    /// Performs an SQL Insert against a collection of pocos 
    /// </summary> 
    /// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted. Assumes that every POCO is of the same type.</param> 
    /// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns> 
    public object BulkInsert(IEnumerable<object> pocos) 
    { 
     Sql sql; 
     IList<PocoColumn> columns = new List<PocoColumn>(); 
     IList<object> parameters; 
     IList<object> inserted; 
     PocoData pd; 
     Type primaryKeyType; 
     object template; 
     string commandText; 
     string tableName; 
     string primaryKeyName; 
     bool autoIncrement; 

     int maxBulkInsert; 

     if (null == pocos) 
     { 
      return new object[] { }; 
     } 

     template = pocos.First<object>(); 

     if (null == template) 
     { 
      return null; 
     } 

     pd = PocoData.ForType(template.GetType()); 
     tableName = pd.TableInfo.TableName; 
     primaryKeyName = pd.TableInfo.PrimaryKey; 
     autoIncrement = pd.TableInfo.AutoIncrement; 

     //Calculate the maximum chunk size 
     maxBulkInsert = 2100/pd.Columns.Count; 
     IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert); 
     IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert); 

     try 
     { 
      OpenSharedConnection(); 
      try 
      { 
       var names = new List<string>(); 
       var values = new List<string>(); 
       var index = 0; 

       foreach (var i in pd.Columns) 
       { 
        // Don't insert result columns 
        if (i.Value.ResultColumn) 
         continue; 

        // Don't insert the primary key (except under oracle where we need bring in the next sequence value) 
        if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0) 
        { 
         primaryKeyType = i.Value.PropertyInfo.PropertyType; 

         // Setup auto increment expression 
         string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo); 
         if (autoIncExpression != null) 
         { 
          names.Add(i.Key); 
          values.Add(autoIncExpression); 
         } 
         continue; 
        } 

        names.Add(_dbType.EscapeSqlIdentifier(i.Key)); 
        values.Add(string.Format("{0}{1}", _paramPrefix, index++)); 
        columns.Add(i.Value); 
       } 

       string outputClause = String.Empty; 
       if (autoIncrement) 
       { 
        outputClause = _dbType.GetInsertOutputClause(primaryKeyName); 
       } 

       commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES", 
           _dbType.EscapeTableName(tableName), 
           string.Join(",", names.ToArray()), 
           outputClause 
           ); 

       sql = new Sql(commandText); 
       parameters = new List<object>(); 
       string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")"); 
       bool isFirstPoco = true; 
       var parameterCounter = 0; 

       foreach (object poco in pacosToInsert) 
       { 
        parameterCounter++; 
        parameters.Clear(); 

        foreach (PocoColumn column in columns) 
        { 
         parameters.Add(column.GetValue(poco)); 
        } 

        sql.Append(valuesText, parameters.ToArray<object>()); 

        if (isFirstPoco && pocos.Count() > 1) 
        { 
         valuesText = "," + valuesText; 
         isFirstPoco = false; 
        } 
       } 

       inserted = new List<object>(); 

       using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments)) 
       { 
        if (!autoIncrement) 
        { 
         DoPreExecute(cmd); 
         cmd.ExecuteNonQuery(); 
         OnExecutedCommand(cmd); 

         PocoColumn pkColumn; 
         if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn)) 
         { 
          foreach (object poco in pocos) 
          { 
           inserted.Add(pkColumn.GetValue(poco)); 
          } 
         } 

         return inserted.ToArray<object>(); 
        } 

        object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName); 

        if (pacosremaining.Any()) 
        { 
         return BulkInsert(pacosremaining); 
        } 

        return id; 

        //using (var reader = cmd.ExecuteReader()) 
        //{ 
        // while (reader.Read()) 
        // { 
        //  inserted.Add(reader[0]); 
        // } 
        //} 

        //object[] primaryKeys = inserted.ToArray<object>(); 

        //// Assign the ID back to the primary key property 
        //if (primaryKeyName != null) 
        //{ 
        // PocoColumn pc; 
        // if (pd.Columns.TryGetValue(primaryKeyName, out pc)) 
        // { 
        //  index = 0; 
        //  foreach (object poco in pocos) 
        //  { 
        //   pc.SetValue(poco, pc.ChangeType(primaryKeys[index])); 
        //   index++; 
        //  } 
        // } 
        //} 

        //return primaryKeys; 
       } 
      } 
      finally 
      { 
       CloseSharedConnection(); 
      } 
     } 
     catch (Exception x) 
     { 
      if (OnException(x)) 
       throw; 
      return null; 
     } 
    } 
2

並在同一行如果你想BulkUpdate:

public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25) 
{ 
    try 
    { 
     object primaryKeyValue = null; 

     OpenSharedConnection(); 
     try 
     { 
      using (var cmd = CreateCommand(_sharedConnection, "")) 
      { 
       var pd = PocoData.ForObject(pocos.First(), primaryKeyName); 

       int count = 0; 
       do 
       { 
        cmd.CommandText = ""; 
        cmd.Parameters.Clear(); 
        var index = 0; 

        var cmdText = new StringBuilder(); 

        foreach (var poco in pocos.Skip(count).Take(batchSize)) 
        { 
         var sb = new StringBuilder(); 
         var colIdx = 0; 
         foreach (var i in pd.Columns) 
         { 
          // Don't update the primary key, but grab the value if we don't have it 
          if (string.Compare(i.Key, primaryKeyName, true) == 0) 
          { 
           primaryKeyValue = i.Value.GetValue(poco); 
           continue; 
          } 

          // Dont update result only columns 
          if (i.Value.ResultColumn) 
           continue; 

          // Build the sql 
          if (colIdx > 0) 
           sb.Append(", "); 
          sb.AppendFormat("{0} = {1}{2}", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix, 
              index++); 

          // Store the parameter in the command 
          AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo); 
          colIdx++; 
         } 

         // Find the property info for the primary key 
         PropertyInfo pkpi = null; 
         if (primaryKeyName != null) 
         { 
          pkpi = pd.Columns[primaryKeyName].PropertyInfo; 
         } 


         cmdText.Append(string.Format("UPDATE {0} SET {1} WHERE {2} = {3}{4};\n", 
                _dbType.EscapeTableName(tableName), sb.ToString(), 
                _dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix, 
                index++)); 
         AddParam(cmd, primaryKeyValue, pkpi); 
        } 

        if (cmdText.Length == 0) break; 

        if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0) 
        { 
         cmdText.Insert(0, "BEGIN\n"); 
         cmdText.Append("\n END;"); 
        } 

        DoPreExecute(cmd); 

        cmd.CommandText = cmdText.ToString(); 
        count += batchSize; 
        cmd.ExecuteNonQuery(); 
        OnExecutedCommand(cmd); 

       } while (true); 
      } 
     } 
     finally 
     { 
      CloseSharedConnection(); 
     } 
    } 
    catch (Exception x) 
    { 
     if (OnException(x)) 
      throw; 
    } 
}