2013-04-18 27 views
3

我需要加快在我的應用程序中執行12個查詢。我從常規的foreach切換到Parallel.ForEach。但有時候我會收到一個錯誤消息,說「ExecuteReader需要一個開放的可用連接連接的當前狀態正在連接。」我的理解是,由於12個查詢中的很多都使用相同的InitialCatalog,所以12中沒有真正的新連接,這可能是問題所在?我怎樣才能解決這個問題? 「sql」是類型「Sql」的列表 - 一個類只是一個字符串名稱,字符串connectiona和一個查詢列表。下面是代碼:Parallel.Foreach SQL查詢有時會導致連接

/// <summary> 
    /// Connects to SQL, performs all queries and stores results in a list of DataTables 
    /// </summary> 
    /// <returns>List of data tables for each query in the config file</returns> 
    public List<DataTable> GetAllData() 
    { 
     Stopwatch sw = new Stopwatch(); 
     sw.Start(); 
     List<DataTable> data = new List<DataTable>(); 

     List<Sql> sql=new List<Sql>(); 

     Sql one = new Sql(); 
     one.connection = "Data Source=XXX-SQL1;Initial Catalog=XXXDB;Integrated Security=True"; 
     one.name = "Col1"; 
     one.queries.Add("SELECT Name FROM [Reports]"); 
     one.queries.Add("SELECT Other FROM [Reports2]"); 
     sql.Add(one); 

     Sql two = new Sql(); 
     two.connection = "Data Source=XXX-SQL1;Initial Catalog=XXXDB;Integrated Security=True"; 
     two.name = "Col2"; 
     two.queries.Add("SELECT AlternateName FROM [Reports1]"); 
     sql.Add(two); 

     Sql three = new Sql(); 
     three.connection = "Data Source=YYY-SQL2;Initial Catalog=YYYDB;Integrated Security=True"; 
     three.name = "Col3"; 
     three.queries.Add("SELECT Frequency FROM Times"); 
     sql.Add(three); 


     try 
     { 
      // ParallelOptions options = new ParallelOptions(); 
      //options.MaxDegreeOfParallelism = 3; 
      // Parallel.ForEach(sql, options, s => 
      Parallel.ForEach(sql, s => 
      //foreach (Sql s in sql) 
      { 
       foreach (string q in s.queries) 
       { 
        using (connection = new SqlConnection(s.connection)) 
        { 
         connection.Open(); 
         DataTable dt = new DataTable(); 
         dt.TableName = s.name; 
         command = new SqlCommand(q, connection); 
         SqlDataAdapter adapter = new SqlDataAdapter(); 
         adapter.SelectCommand = command; 
         adapter.Fill(dt); 
         //adapter.Dispose(); 

         lock (data) 
         { 
          data.Add(dt); 
         } 
        } 
       } 
      } 
      ); 
     } 
     catch (Exception ex) 
     { 
      MessageBox.Show(ex.ToString(), "GetAllData error"); 
     } 

     sw.Stop(); 
     MessageBox.Show(sw.Elapsed.ToString()); 

     return data; 
    } 

這裏的我做了你需要的SQL類:

/// <summary> 
/// Class defines a SQL connection and its respective queries 
/// </summary> 
public class Sql 
{ 
    /// <summary> 
    /// Name of the connection/query 
    /// </summary> 
    public string name { get; set; } 
    /// <summary> 
    /// SQL Connection string 
    /// </summary> 
    public string connection { get; set; } 
    /// <summary> 
    /// List of SQL queries for a connection 
    /// </summary> 
    public List<string> queries = new List<string>(); 
} 

回答

6

我會重構你的業務邏輯(連接到數據庫)。

public class SqlOperation 
{ 
    public SqlOperation() 
    { 
     Queries = new List<string>(); 
    } 

    public string TableName { get; set; } 
    public string ConnectionString { get; set; } 
    public List<string> Queries { get; set; } 
} 

public static List<DataTable> GetAllData(IEnumerable<SqlOperation> sql) 
{ 
    var taskArray = 
     sql.SelectMany(s => 
      s.Queries 
      .Select(query => 
       Task.Run(() => //Task.Factory.StartNew for .NET 4.0 
        ExecuteQuery(s.ConnectionString, s.TableName, query)))) 
      .ToArray(); 

    try 
    { 
     Task.WaitAll(taskArray); 
    } 
    catch(AggregateException e) 
    { 
     MessageBox.Show(e.ToString(), "GetAllData error"); 
    } 

    return taskArray.Where(t => !t.IsFaulted).Select(t => t.Result).ToList(); 
} 

public static DataTable ExecuteQuery(string connectionString, string tableName, string query) 
{ 
    DataTable dataTable = null; 

    using (var connection = new SqlConnection(connectionString)) 
    { 
     dataTable = new DataTable(); 
     dataTable.TableName = tableName; 
     using(var command = new SqlCommand(query, connection)) 
     { 
      connection.Open(); 

      using(var adapter = new SqlDataAdapter()) 
      { 
       adapter.SelectCommand = command; 
       adapter.Fill(dataTable); 
      } 
     } 
    } 

    return dataTable; 
} 
+1

爲什麼你使用'Parallel.ForEach'啓動你應該連續地開始任務,或者讓'Parallel.ForEach'處理並行化,在這種情況下,我沒有理由不去做後者。 – Servy

+0

我只是試圖用Parallel.ForEach替換foreach循環。基本上,由於一些這些查詢需要一段時間,我一次只想做多個查詢。 –

+1

@Romoku:看起來很好,但產生錯誤:System.Threading.Tasks.Task不包含「結果」的定義和沒有擴展方法「結果」接受類型「System.Threading.Tasks.Task」的第一個參數可以是找到並且'System.Threading.Tasks.Task'不包含'Run'的定義。有什麼不同的參考我需要什麼? –

4

Ado.Net有一個非常聰明的連接池,所以一般你應該打開的連接和關閉每個命令的連接並讓池處理它們是否真的被打開或關閉。

所以每個命令一個連接:

Parallel.ForEach(sql, s=> 
      //foreach (Sql s in sql) 
      { 
       foreach (string q in s.queries) 
       { 
        using (connection = new SqlConnection(s.connection)) 
        { 
         connection.Open(); 
         DataTable dt = new DataTable(); 
         dt.TableName = s.name; 
         command = new SqlCommand(q, connection); 
         SqlDataAdapter adapter = new SqlDataAdapter(); 
         adapter.SelectCommand = command; 
         adapter.Fill(dt); 
         //adapter.Dispose(); 

         lock(data){ 
          data.Add(dt); 
         } 
        } 
       } 
      } 
+5

他還需要同步'data.Add(dt);'因爲'List(T).Add'不是線程安全的。 – Romoku

+0

Hmmmmm ...想這(將在foreach●在查詢中使用結果之前,在一個錯誤說無法打開數據庫「XXX」由登錄請求。登錄失敗。用戶登錄失敗「XXXXX」。每次。 –

+1

@ user1029770:。。對不起,連接需要開放參見編輯 – faester

0

您還可以使用MultipleActiveResultSets = TRUE; in連接字符串以支持多個讀取器