2014-03-26 65 views
0

我已經創建了一些異步方法,並且已經將這些記錄添加到這些方法中的dataTable中,然後在異步方法完成後批量複製dataTable。我發現dataTables不是線程安全的,這是我的問題的根源,因爲我注意到一個或兩個記錄沒有實際插入。我有這樣的事情:異步方法中的數據表

private async void yea() 
    { 
     DataTable t = new DataTable();  
     //Fill the data table with its columns 
     IEnumerable<Task<string>> results = items.Select(q => AsyncFunction(q.id, t)); 
     Task<string[]> allTasks = Task.WhenAll(results); 
     string[] allResults = await results; 

     using (SqlConnection conn = new SqlConnection(_connString)) 
     { 
      conn.Open(); 
      using (SqlBulkCopy bc = new SqlBulkCopy(conn)) 
      { 
       bc.BatchSize = 1000; 
       bc.DestinationTableName = tableName; 
       bc.WriteToServer(t); 
      } 
     } 
    } 

    public async Task<string> AsyncFunction(int id, DataTable t) 
    { 
     //await another Async function 
     DataRow dr = t.NewRow(); 
     dr["ID"] = id; 
     //Many More columns 
     t.Rows.Add(dr); 
     return "success"; 
    } 

就像我說的我的問題是,大容量複製往往會遺漏一些記錄。如何批量複製所有記錄而不會丟失到非線程安全調用?

回答

1

每個任務只做一個數據表,當你去上傳的時候你可以再次將它們連接在一起,你的代碼不完整(例如Result來自哪裏以及它是如何使用的),但這裏是基本的你可以做到這一點,你需要適應你的代碼。

DataTable t = new DataTable(); 
//Fill the data table with its columns 
IEnumerable<Task<Result>> results = items.Select(q => AsyncFunction(q.id, t); 

Task<NewResult[]> allTasks = Task.WhenAll(results); //This line is unnecessary with the code available. 

NewResult[] allResults = await results; 


using (SqlConnection conn = new SqlConnection(_connString)) 
{ 
    conn.Open(); 
    using (SqlBulkCopy bc = new SqlBulkCopy(conn)) 
    { 
     bc.BatchSize = 1000; 
     bc.DestinationTableName = tableName; 

     //Joins all of the data rows from all of the generated tables in to a single array. 
     DataRow[] allRows = allResults.SelectMany(a=>a.LocalDataTable.AsEnumerable().ToArray(); 

     bc.WriteToServer(allRows); 
    } 
} 


public async Task<NewResult> AsyncFunction(int id, DataTable template) 
{ 
    DataTable localDataTable = template.Clone(); //DataTable is thread safe for read operations like .Clone()   

    //Do some stuff 
    DataRow dr = t.NewRow(); 
    dr["ID"] = id 
    //Many More columns 
    t.Rows.Add(dr); 

    //Be sure you have a "await" somewhere in that removed section or else this code will not be multi-threaded. 

    return new NewResult("success", localDataTable);  
} 

你也可以調用每個類.CreateDataReader()然後寫一個包裝,將連接多個IDataReaders在一起,這樣你就不會需要分配額外的DataRow[]或只是調用SqlBulkCopy多次,一次爲每個數據表。

2

不要在異步方法中改變DataTable。使用異步方法計算行,將其返回,然後在獲得結果後添加所有行:

foreach(var row in await Task.WhenAll(items.Select(q => AsyncFunction(q.id, t)) 
    t.Rows.Add(row);