2016-02-03 82 views
0

我有一個IEnumerable實體,它擁有大約100萬條記錄。我想執行Parallel.ForEach來插入這些數據。任務並行庫拋出NullReferenceException與IEnumerable

這裏說的是類我有什麼:Employee.cs

SqlConneciton conn = base.GetConnection(); 
conn.open(); 
IEnumerable<Employee> employeeList = GetListofEmployeesFromDB(); 
Parallel.ForEach(employeeList 
, employee => 
{ 
    employee.add(conn, sqlTransaction); 
}); 

Empployee.cs 
{ 
     public void add(SqlConnection conn, SqlTransaction sqlTransaction) 
     { 
      using (SqlCommand insertCmd = new SqlCommand("EmployeeInsert", conn)) 
      { 
       insertCmd.CommandType = CommandType.StoredProcedure; 
       insertCmd.Transaction = transaction; 
       insertCmd.Parameters["@Name"].Value = this.Name; 
       insertCmd.ExecuteNonQuery(); 
       this.id = (int)insertCmd.Parameters["@Id"].Value; 
      } 
     } 
} 

隨着數據的插入,我看到有一個NPE在:

this.id = (int)insertCmd.Parameters["@Id"].Value; 

不知道我我失蹤有或沒有。這裏是我看到的例外。

System.AggregateException was unhandled 
    Message=AggregateException_ctor_DefaultMessage 
    Source=System.Threading 
    StackTrace: 
     at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken) 
     at System.Threading.Tasks.Task.Wait() 
     at System.Threading.Tasks.Parallel.PartitionerForEachWorker[TSource,TLocal](Partitioner`1 source, ParallelOptions parallelOptions, Action`1 simpleBody, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally) 
     at System.Threading.Tasks.Parallel.ForEach[TSource](Partitioner`1 source, ParallelOptions parallelOptions, Action`1 body) 
     : 
     : 
     : 
     at System.Threading.ExecutionContext.runTryCode(Object userData) 
     at System.Runtime.CompilerServices.RuntimeHelpers.ExecuteCodeWithGuaranteedCleanup(TryCode code, CleanupCode backoutCode, Object userData) 
     at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) 
     at System.Threading.ThreadHelper.ThreadStart(Object obj) 
    InnerException: System.NullReferenceException 
     Message=Object reference not set to an instance of an object. 
     Source=Jobvite.Library 
     StackTrace:    
      : 
      : 
      : 
      at System.Threading.Tasks.Parallel.<>c__DisplayClass32`2.<PartitionerForEachWorker>b__30() 
      at System.Threading.Tasks.Task.InnerInvoke() 
      at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
      at System.Threading.Tasks.Task.<>c__DisplayClass3.<ExecuteSelfReplicating>b__2(Object) 
     InnerException: 
+0

出於好奇,如果你使用你的'Employee.Add'一個新的連接,你得到同樣的錯誤? – Balah

+0

只有在堆棧跟蹤中帶':'的行的含義是什麼? –

+0

@balah:我可以嘗試,但建議不使用Parallel.ForEach for DB。 – nimi

回答

1

System.AggregateException是因應用程序引發多個異常而引發的。

原因
您正在以並行模式訪問Connection對象。多個任務試圖在同一時間訪問它,並在無法控制它時引發異常。一次只有一個線程可以訪問數據庫連接。
創建多個線程將數據插入數據庫不會加快速度。 (即使您設法找到任何並行方法),因爲每次寫入數據時DB都會被鎖定,並且所有數據都將按順序插入。

使用正常的插入過程,它會更快。

0

(一旦我找出「1 lac」是什麼)看起來你想要做一個Bulk Insert。您可以使用SqlBulkCopy來實現這一目的 - 它旨在有效加載SQL表。

但是,我看到你也希望ids回來,所以上面不會讓你一路。我看你使用存儲過程,一個方式做的(假設你有SQL 2008及以上):

  1. 創建一個表值數據類型包含要插入的數據。

    CREATE TYPE [dbo].[EmployeeDataType] As Table 
    (
        ID INT, 
        -- employee details 
    ) 
    
  2. 更改您的存儲過程使用此表值參數作爲輸入,當它執行插入,它和輸出。例如

    CREATE PROCEDURE [dbo].[EmployeeInsert] 
    (
        @EmployeeInsertParameter As [dbo].[EmployeeDataType] READONLY 
    ) 
    AS 
    ... 
    
    INSERT INTO Employee 
    SELECT * FROM @EmployeeInsertParameter e 
    OUTPUT INSERTED.* 
    

(很明顯,你會說出列,而不是使用*)

  • 更改您的代碼使用Parallel.ForEach,而是這樣做:

    DataTable employeeDataTable = new DataTable("EmployeeDataType"); 
    // fill in the rows using 
    ... 
    insertCmd.Parameters["@EmployeeInsertParameter"].Value = employeeDataTable; 
    ... 
    
  • 讀取所存儲的程序執行到的結果

  • 結論:基本上不使用Parallel.For爲DB連接。通過這種方式,您可以正確使用一個連接(不會導致「NPE」),並且大部分處理都將在內存中完成,只要您擁有RAM,它的速度就會快幾個數量級。


    下面是另一個例子可能的方式,但更復雜:https://stackoverflow.com/a/21689413/3419825

    相關問題