我正在使用Save()
方法來插入或更新記錄,但是我希望僅使用一次數據庫命中就可以執行批量插入和批量更新。我該怎麼做呢?使用Petapoco進行批量插入/更新
回答
你可以對你的記錄做一個foreach。
foreach (var record in records) {
db.Save(record);
}
我嘗試兩種不同的方法插入行大量比默認插入(當你有很多行,這是相當慢的)速度更快。
1)彌補列表<Ť>與POCO的第一,然後在循環中在一次將它們插入(和在一個事務):
using (var tr = PetaPocoDb.GetTransaction())
{
foreach (var record in listOfRecords)
{
PetaPocoDb.Insert(record);
}
tr.Complete();
}
2)SqlBulkCopy的數據表:
var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
bulkCopy.DestinationTableName = "SomeTable";
bulkCopy.WriteToServer(dt);
要將我的清單<T>轉換爲DataTable我使用了Marc Gravells Convert generic List/Enumerable to DataTable?函數,它爲我工作ootb(在我重新安排Poco屬性的順序與表fi相同)
SqlBulkCopy是最快的,比我在〜1000行的(快速)性能測試中的事務處理方法快了50%。
H個
在我的情況,我把database.Execute()
方法的優點。
我創建了我的插件的第一部分中的SQL參數:
var sql = new Sql("insert into myTable(Name, Age, Gender) values");
for (int i = 0; i < pocos.Count ; ++i)
{
var p = pocos[i];
sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender);
if(i != pocos.Count -1)
sql.Append(",");
}
Database.Execute(sql);
插入一個SQL查詢快得多。
這裏是一個客戶方法PetaPoco.Database類,增加了做任何集合的批量插入能力:
public void BulkInsertRecords<T>(IEnumerable<T> collection)
{
try
{
OpenSharedConnection();
using (var cmd = CreateCommand(_sharedConnection, ""))
{
var pd = Database.PocoData.ForType(typeof(T));
var tableName = EscapeTableName(pd.TableInfo.TableName);
string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray());
var pocoValues = new List<string>();
var index = 0;
foreach (var poco in collection)
{
var values = new List<string>();
foreach (var i in pd.Columns)
{
values.Add(string.Format("{0}{1}", _paramPrefix, index++));
AddParam(cmd, i.Value.GetValue(poco), _paramPrefix);
}
pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
}
var sql = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues));
cmd.CommandText = sql;
cmd.ExecuteNonQuery();
}
}
finally
{
CloseSharedConnection();
}
}
非常好,將perfs與我描述的事務方法進行比較會很有趣(我相信你的速度更快,但是多少?)另外 - afaik - 如果將事務包裝插入事務中,您應該獲得額外的性能(http ://blog.staticvoid.co.nz/2012/4/26/making_dapper_faster_with_transactions) – joeriks
這裏是BulkInsert的代碼,您可以添加到V5.01 PetaPoco.cs
您可以粘貼的地方關閉定時刀片上線1098
你給它波蘇斯的IEnumerable,它會發送到數據庫
分批一起。代碼是從普通插入90%。
我沒有性能對比,讓我知道:)
/// <summary>
/// Bulk inserts multiple rows to SQL
/// </summary>
/// <param name="tableName">The name of the table to insert into</param>
/// <param name="primaryKeyName">The name of the primary key column of the table</param>
/// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param>
/// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
/// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>
public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25)
{
try
{
OpenSharedConnection();
try
{
using (var cmd = CreateCommand(_sharedConnection, ""))
{
var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
// Create list of columnnames only once
var names = new List<string>();
foreach (var i in pd.Columns)
{
// Don't insert result columns
if (i.Value.ResultColumn)
continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
{
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
{
names.Add(i.Key);
}
continue;
}
names.Add(_dbType.EscapeSqlIdentifier(i.Key));
}
var namesArray = names.ToArray();
var values = new List<string>();
int count = 0;
do
{
cmd.CommandText = "";
cmd.Parameters.Clear();
var index = 0;
foreach (var poco in pocos.Skip(count).Take(batchSize))
{
values.Clear();
foreach (var i in pd.Columns)
{
// Don't insert result columns
if (i.Value.ResultColumn) continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
{
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
{
values.Add(autoIncExpression);
}
continue;
}
values.Add(string.Format("{0}{1}", _paramPrefix, index++));
AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
}
string outputClause = String.Empty;
if (autoIncrement)
{
outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
}
cmd.CommandText += string.Format("INSERT INTO {0} ({1}){2} VALUES ({3})", _dbType.EscapeTableName(tableName),
string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray()));
}
// Are we done?
if (cmd.CommandText == "") break;
count += batchSize;
DoPreExecute(cmd);
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
}
while (true);
}
}
finally
{
CloseSharedConnection();
}
}
catch (Exception x)
{
if (OnException(x))
throw;
}
}
/// <summary>
/// Performs a SQL Bulk Insert
/// </summary>
/// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
/// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>
public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25)
{
if (!pocos.Any()) return;
var pd = PocoData.ForType(pocos.First().GetType());
BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos);
}
下面是PetaPoco的BulkInsert方法上taylonr的非常聰明的想法通過INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n)
使用插入多行的SQL技術的擴展。
它還返回插入記錄的自動增量(標識)值,我不相信會發生在IvoTops的實現中。
注意: SQL Server 2012(及以下版本)每個查詢限制2,100個參數。 (這可能是Zelid評論引用的堆棧溢出異常的來源)。您需要根據未裝飾爲Ignore
或Result
的列數手動拆分批次。例如,21列的POCO應以99或者(2100 - 1)/21
的批量發送。我可能會重構這個動態分割基於此SQL Server的限制的批次;但是,通過管理此方法外部的批處理大小,您將始終可以看到最佳結果。
與我以前在單個事務中爲所有插入使用共享連接的技術相比,此方法的執行時間顯示出大約50%的增益。
這是Massive真正閃耀的一個領域 - Massive有一個Save(params對象[]事物),它構建一個IDbCommand數組,並在共享連接上執行每一個。它可以直接使用,並且不會遇到參數限制。
/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted. Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
/// <remarks>
/// NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query. This limitation does not seem to apply on other platforms, so
/// this method will allow more than 2100 parameters. See http://msdn.microsoft.com/en-us/library/ms143432.aspx
/// The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection
/// </remarks>
public object[] BulkInsert(IEnumerable<object> pocos)
{
Sql sql;
IList<PocoColumn> columns = new List<PocoColumn>();
IList<object> parameters;
IList<object> inserted;
PocoData pd;
Type primaryKeyType;
object template;
string commandText;
string tableName;
string primaryKeyName;
bool autoIncrement;
if (null == pocos)
return new object[] {};
template = pocos.First<object>();
if (null == template)
return null;
pd = PocoData.ForType(template.GetType());
tableName = pd.TableInfo.TableName;
primaryKeyName = pd.TableInfo.PrimaryKey;
autoIncrement = pd.TableInfo.AutoIncrement;
try
{
OpenSharedConnection();
try
{
var names = new List<string>();
var values = new List<string>();
var index = 0;
foreach (var i in pd.Columns)
{
// Don't insert result columns
if (i.Value.ResultColumn)
continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
{
primaryKeyType = i.Value.PropertyInfo.PropertyType;
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
{
names.Add(i.Key);
values.Add(autoIncExpression);
}
continue;
}
names.Add(_dbType.EscapeSqlIdentifier(i.Key));
values.Add(string.Format("{0}{1}", _paramPrefix, index++));
columns.Add(i.Value);
}
string outputClause = String.Empty;
if (autoIncrement)
{
outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
}
commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
_dbType.EscapeTableName(tableName),
string.Join(",", names.ToArray()),
outputClause
);
sql = new Sql(commandText);
parameters = new List<object>();
string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
bool isFirstPoco = true;
foreach (object poco in pocos)
{
parameters.Clear();
foreach (PocoColumn column in columns)
{
parameters.Add(column.GetValue(poco));
}
sql.Append(valuesText, parameters.ToArray<object>());
if (isFirstPoco)
{
valuesText = "," + valuesText;
isFirstPoco = false;
}
}
inserted = new List<object>();
using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
{
if (!autoIncrement)
{
DoPreExecute(cmd);
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
PocoColumn pkColumn;
if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
{
foreach (object poco in pocos)
{
inserted.Add(pkColumn.GetValue(poco));
}
}
return inserted.ToArray<object>();
}
// BUG: the following line reportedly causes duplicate inserts; need to confirm
//object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);
using(var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
inserted.Add(reader[0]);
}
}
object[] primaryKeys = inserted.ToArray<object>();
// Assign the ID back to the primary key property
if (primaryKeyName != null)
{
PocoColumn pc;
if (pd.Columns.TryGetValue(primaryKeyName, out pc))
{
index = 0;
foreach(object poco in pocos)
{
pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
index++;
}
}
}
return primaryKeys;
}
}
finally
{
CloseSharedConnection();
}
}
catch (Exception x)
{
if (OnException(x))
throw;
return null;
}
}
[Another answer](http://stackoverflow.com/a/17262788/38360)指出了自動增量路徑中的一個嚴重錯誤在這裏導致每個記錄都被插入兩次 - 但這種做法在整個過程中造成了一切。我需要做的只是刪除以下行:'object id = _dbType.ExecuteInsert(this,cmd,primaryKeyName);'。我會繼續編輯您的帖子,但是您應該檢查並確保可以安全地刪除該行。 – Aaronaught
嗨@Aaronaught,謝謝你的擡頭。我已經註釋掉了這一行,直到我可以訪問帶有VS的Windows盒子進行測試。這個bug並沒有讓我感到意外,因爲我創建這個代碼的db對於重複插入是相當寬容的。 –
這裏是史蒂夫·詹森的答案是,在最大2100個羊駝
我註釋掉,因爲它產生重複數據庫下面的代碼的chuncs分裂的更新verision ...
//using (var reader = cmd.ExecuteReader())
//{
// while (reader.Read())
// {
// inserted.Add(reader[0]);
// }
//}
更新的代碼
/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted. Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
public object BulkInsert(IEnumerable<object> pocos)
{
Sql sql;
IList<PocoColumn> columns = new List<PocoColumn>();
IList<object> parameters;
IList<object> inserted;
PocoData pd;
Type primaryKeyType;
object template;
string commandText;
string tableName;
string primaryKeyName;
bool autoIncrement;
int maxBulkInsert;
if (null == pocos)
{
return new object[] { };
}
template = pocos.First<object>();
if (null == template)
{
return null;
}
pd = PocoData.ForType(template.GetType());
tableName = pd.TableInfo.TableName;
primaryKeyName = pd.TableInfo.PrimaryKey;
autoIncrement = pd.TableInfo.AutoIncrement;
//Calculate the maximum chunk size
maxBulkInsert = 2100/pd.Columns.Count;
IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert);
IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert);
try
{
OpenSharedConnection();
try
{
var names = new List<string>();
var values = new List<string>();
var index = 0;
foreach (var i in pd.Columns)
{
// Don't insert result columns
if (i.Value.ResultColumn)
continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
{
primaryKeyType = i.Value.PropertyInfo.PropertyType;
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
{
names.Add(i.Key);
values.Add(autoIncExpression);
}
continue;
}
names.Add(_dbType.EscapeSqlIdentifier(i.Key));
values.Add(string.Format("{0}{1}", _paramPrefix, index++));
columns.Add(i.Value);
}
string outputClause = String.Empty;
if (autoIncrement)
{
outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
}
commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
_dbType.EscapeTableName(tableName),
string.Join(",", names.ToArray()),
outputClause
);
sql = new Sql(commandText);
parameters = new List<object>();
string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
bool isFirstPoco = true;
var parameterCounter = 0;
foreach (object poco in pacosToInsert)
{
parameterCounter++;
parameters.Clear();
foreach (PocoColumn column in columns)
{
parameters.Add(column.GetValue(poco));
}
sql.Append(valuesText, parameters.ToArray<object>());
if (isFirstPoco && pocos.Count() > 1)
{
valuesText = "," + valuesText;
isFirstPoco = false;
}
}
inserted = new List<object>();
using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
{
if (!autoIncrement)
{
DoPreExecute(cmd);
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
PocoColumn pkColumn;
if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
{
foreach (object poco in pocos)
{
inserted.Add(pkColumn.GetValue(poco));
}
}
return inserted.ToArray<object>();
}
object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);
if (pacosremaining.Any())
{
return BulkInsert(pacosremaining);
}
return id;
//using (var reader = cmd.ExecuteReader())
//{
// while (reader.Read())
// {
// inserted.Add(reader[0]);
// }
//}
//object[] primaryKeys = inserted.ToArray<object>();
//// Assign the ID back to the primary key property
//if (primaryKeyName != null)
//{
// PocoColumn pc;
// if (pd.Columns.TryGetValue(primaryKeyName, out pc))
// {
// index = 0;
// foreach (object poco in pocos)
// {
// pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
// index++;
// }
// }
//}
//return primaryKeys;
}
}
finally
{
CloseSharedConnection();
}
}
catch (Exception x)
{
if (OnException(x))
throw;
return null;
}
}
並在同一行如果你想BulkUpdate:
public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25)
{
try
{
object primaryKeyValue = null;
OpenSharedConnection();
try
{
using (var cmd = CreateCommand(_sharedConnection, ""))
{
var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
int count = 0;
do
{
cmd.CommandText = "";
cmd.Parameters.Clear();
var index = 0;
var cmdText = new StringBuilder();
foreach (var poco in pocos.Skip(count).Take(batchSize))
{
var sb = new StringBuilder();
var colIdx = 0;
foreach (var i in pd.Columns)
{
// Don't update the primary key, but grab the value if we don't have it
if (string.Compare(i.Key, primaryKeyName, true) == 0)
{
primaryKeyValue = i.Value.GetValue(poco);
continue;
}
// Dont update result only columns
if (i.Value.ResultColumn)
continue;
// Build the sql
if (colIdx > 0)
sb.Append(", ");
sb.AppendFormat("{0} = {1}{2}", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix,
index++);
// Store the parameter in the command
AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
colIdx++;
}
// Find the property info for the primary key
PropertyInfo pkpi = null;
if (primaryKeyName != null)
{
pkpi = pd.Columns[primaryKeyName].PropertyInfo;
}
cmdText.Append(string.Format("UPDATE {0} SET {1} WHERE {2} = {3}{4};\n",
_dbType.EscapeTableName(tableName), sb.ToString(),
_dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix,
index++));
AddParam(cmd, primaryKeyValue, pkpi);
}
if (cmdText.Length == 0) break;
if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0)
{
cmdText.Insert(0, "BEGIN\n");
cmdText.Append("\n END;");
}
DoPreExecute(cmd);
cmd.CommandText = cmdText.ToString();
count += batchSize;
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
} while (true);
}
}
finally
{
CloseSharedConnection();
}
}
catch (Exception x)
{
if (OnException(x))
throw;
}
}
- 1. 進行批量插入/更新用
- 2. PetaPoco插入/更新
- 3. 使用Couchbase Ottoman進行批量插入或更新
- 4. 無法進行批量更新插入使用蒙戈
- 5. 使用groovy Sql進行批量插入?
- 6. 使用NamedParameterJdbcTemplate進行批量插入
- 7. 休眠批量插入,批量更新
- 8. 使用Mongoid進行批量更新
- 9. 使用ServiceStack webservice進行批量更新
- 10. 使用燈泡和rexster批量/批量更新/插入?
- 11. Java - 如何對數據庫進行批量插入和更新
- 12. 如何進行批量更新並插入
- 13. 用EF6批量插入/更新?
- 14. 使用實體框架進行批量插入/更新的有效方法
- 15. 如何有效地使用SQLAlchemy進行批量插入或更新?
- 16. JPA批量更新似乎是插入每批更新
- 17. 批量插入行
- 18. 批量插入或更新與休眠?
- 19. SQL Server 2005批量更新或插入
- 20. 在Sql Azure上批量插入更新
- 21. 在MongoDB中批量更新/插入?
- 22. 批量插入和更新MySQL
- 23. Hibernate批量插入/更新constraintviolationexception
- 24. Visual Studio C#Linq批量插入/更新
- 25. 批量插入與重複鍵更新
- 26. 批量插入/更新PDO MySQL查詢
- 27. MyBatis批量插入/更新Oracle
- 28. Slick 3.0批量插入或更新(upsert)
- 29. 如何批量更新/插入mongoid/mongodb?
- 30. NHibernate批量插入或更新
這是否只創建一個數據庫命中? – Dan
不是每個記錄都會觸發一次數據庫。你如何期望在一次數據庫命中時做到這一點?除非你只是想生成一個更新語句並執行它。 – Schotime