2011-09-30 42 views
0

我們注意到,我們的.NET應用程序的內部,我們有競爭,當涉及到使用SqlDataReader的。雖然我們知道SqlDataReader不是ThreadSafe,但它應該縮放。下面的代碼是一個簡單的例子來表明,由於存在對SqlDataReader的GetValue方法競爭,我們不能擴展我們的應用程序。我們不受CPU,磁盤或網絡的限制;只是SqlDataReader的內部爭用。我們可以使用1個線程運行應用程序10次,它可以線性縮放,但1個應用程序中的10個線程不會縮放。有關如何在單個c#應用程序中擴展從SQL Server讀取的任何想法?c。與SqlDataReaders和SqlDataAdapters#線程爭

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Diagnostics; 
using System.Globalization; 

namespace ThreadAndSQLTester 
{ 
    class Host 
    { 
     /// <summary> 
     /// Gets or sets the receive workers. 
     /// </summary> 
     /// <value>The receive workers.</value> 
     internal List<Worker> Workers { get; set; } 
     /// <summary> 
     /// Gets or sets the receive threads. 
     /// </summary> 
     /// <value>The receive threads.</value> 
     internal List<Thread> Threads { get; set; } 

     public int NumberOfThreads { get; set; } 
     public int Sleep { get; set; } 
     public int MinutesToRun { get; set; } 
     public bool IsRunning { get; set; } 
     private System.Timers.Timer runTime; 

     private object lockVar = new object(); 

     public Host() 
     { 
      Init(1, 0, 0); 
     } 

     public Host(int numberOfThreads, int sleep, int minutesToRun) 
     { 
      Init(numberOfThreads, sleep, minutesToRun); 
     } 

     private void Init(int numberOfThreads, int sleep, int minutesToRun) 
     { 
      this.Workers = new List<Worker>(); 
      this.Threads = new List<Thread>(); 

      this.NumberOfThreads = numberOfThreads; 
      this.Sleep = sleep; 
      this.MinutesToRun = minutesToRun; 

      SetUpTimer(); 
     } 

     private void SetUpTimer() 
     { 
      if (this.MinutesToRun > 0) 
      { 
       this.runTime = new System.Timers.Timer(); 
       this.runTime.Interval = TimeSpan.FromMinutes(this.MinutesToRun).TotalMilliseconds; 
       this.runTime.Elapsed += new System.Timers.ElapsedEventHandler(runTime_Elapsed); 
       this.runTime.Start(); 
      } 
     } 

     void runTime_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
     { 
      this.runTime.Stop(); 
      this.Stop(); 
      this.IsRunning = false; 
     } 

     public void Start() 
     { 
      this.IsRunning = true; 

      Random r = new Random(DateTime.Now.Millisecond); 

      for (int i = 0; i < this.NumberOfThreads; i++) 
      { 
       string threadPoolId = Math.Ceiling(r.NextDouble() * 10).ToString(); 

       Worker worker = new Worker("-" + threadPoolId); //i.ToString()); 
       worker.Sleep = this.Sleep; 

       this.Workers.Add(worker); 

       Thread thread = new Thread(worker.Work); 
       worker.Name = string.Format("WorkerThread-{0}", i); 

       thread.Name = worker.Name; 

       this.Threads.Add(thread); 
       thread.Start(); 

       Debug.WriteLine(string.Format(CultureInfo.InvariantCulture, "Started new Worker Thread. Total active: {0}", i + 1)); 
      } 
     } 

     public void Stop() 
     { 
      if (this.Workers != null) 
      { 
       lock (lockVar) 
       { 
        for (int i = 0; i < this.Workers.Count; i++) 
        { 
         //Thread thread = this.Threads[i]; 
         //thread.Interrupt(); 
         this.Workers[i].IsEnabled = false; 
        } 

        for (int i = this.Workers.Count - 1; i >= 0; i--) 
        { 
         Worker worker = this.Workers[i]; 
         while (worker.IsRunning) 
         { 
          Thread.Sleep(32); 
         } 
        } 

        foreach (Thread thread in this.Threads) 
        { 
         thread.Abort(); 
        } 

        this.Workers.Clear(); 
        this.Threads.Clear(); 
       } 
      } 
     } 

    } 
} 

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Data.SqlClient; 
using System.Data; 
using System.Threading; 
using System.ComponentModel; 
using System.Data.OleDb; 

namespace ThreadAndSQLTester 
{ 
    class Worker 
    { 
     public bool IsEnabled { get; set; } 
     public bool IsRunning { get; set; } 
     public string Name { get; set; } 
     public int Sleep { get; set; } 

     private string dataCnString { get; set; } 
     private string logCnString { get; set; } 

     private List<Log> Logs { get; set; } 

     public Worker(string threadPoolId) 
     { 
      this.Logs = new List<Log>(); 

      SqlConnectionStringBuilder cnBldr = new SqlConnectionStringBuilder(); 
      cnBldr.DataSource = @"trgcrmqa3"; 
      cnBldr.InitialCatalog = "Scratch"; 
      cnBldr.IntegratedSecurity = true; 
      cnBldr.MultipleActiveResultSets = true; 
      cnBldr.Pooling = true;    

      dataCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), threadPoolId);    

      cnBldr = new SqlConnectionStringBuilder(); 
      cnBldr.DataSource = @"trgcrmqa3"; 
      cnBldr.InitialCatalog = "Scratch"; 
      cnBldr.IntegratedSecurity = true; 

      logCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), string.Empty); 

      IsEnabled = true; 
     } 

     private string machineName { get; set; } 
     private string GetConnectionStringWithWorkStationId(string connectionString, string connectionPoolToken) 
     { 
      if (string.IsNullOrEmpty(machineName)) machineName = Environment.MachineName; 

      SqlConnectionStringBuilder cnbdlr; 
      try 
      { 
       cnbdlr = new SqlConnectionStringBuilder(connectionString); 
      } 
      catch 
      { 
       throw new ArgumentException("connection string was an invalid format"); 
      } 

      cnbdlr.WorkstationID = machineName + connectionPoolToken; 

      return cnbdlr.ConnectionString; 
     } 

     public void Work() 
     { 
      int i = 0; 

      while (this.IsEnabled) 
      { 
       this.IsRunning = true; 

       try 
       { 
        Log log = new Log(); 
        log.WorkItemId = Guid.NewGuid(); 
        log.StartTime = DateTime.Now; 
        List<object> lst = new List<object>(); 

        using (SqlConnection cn = new SqlConnection(this.dataCnString)) 
        { 
         try 
         { 
          cn.Open(); 

          using (SqlCommand cmd = new SqlCommand("Analysis.spSelectTestData", cn)) 
          { 
           cmd.CommandType = System.Data.CommandType.StoredProcedure; 

           using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) // DBHelper.ExecuteReader(cn, cmd)) 
           {          
            while (dr.Read()) 
            { 
             CreateClaimHeader2(dr, lst); 
            } 

            dr.Close(); 
           } 

           cmd.Cancel(); 
          } 
         } 
         catch { } 
         finally 
         { 
          cn.Close(); 
         } 
        } 

        log.StopTime = DateTime.Now; 
        log.RouteName = this.Name; 
        log.HostName = this.machineName; 

        this.Logs.Add(log); 
        i++; 

        if (i > 1000) 
        { 
         Console.WriteLine(string.Format("Thread: {0} executed {1} items.", this.Name, i)); 
         i = 0; 
        } 

        if (this.Sleep > 0) Thread.Sleep(this.Sleep); 
       } 
       catch { } 
      } 

      this.LogMessages(); 

      this.IsRunning = false; 
     }  

     private void CreateClaimHeader2(IDataReader reader, List<object> lst) 
     { 
      lst.Add(reader["ClaimHeaderID"]); 
      lst.Add(reader["ClientCode"]); 
      lst.Add(reader["MemberID"]); 
      lst.Add(reader["ProviderID"]); 
      lst.Add(reader["ClaimNumber"]); 
      lst.Add(reader["PatientAcctNumber"]); 
      lst.Add(reader["Source"]); 
      lst.Add(reader["SourceID"]); 
      lst.Add(reader["TotalPayAmount"]); 
      lst.Add(reader["TotalBillAmount"]); 
      lst.Add(reader["FirstDateOfService"]); 
      lst.Add(reader["LastDateOfService"]); 
      lst.Add(reader["MaxStartDateOfService"]); 
      lst.Add(reader["MaxValidStartDateOfService"]); 
      lst.Add(reader["LastUpdated"]); 
      lst.Add(reader["UpdatedBy"]); 
     } 

     /// <summary> 
     /// Toes the data table. 
     /// </summary> 
     /// <typeparam name="T"></typeparam> 
     /// <param name="data">The data.</param> 
     /// <returns></returns> 
     public DataTable ToDataTable<T>(IEnumerable<T> data) 
     { 
      PropertyDescriptorCollection props = 
       TypeDescriptor.GetProperties(typeof(T)); 

      if (props == null) throw new ArgumentNullException("Table properties."); 
      if (data == null) throw new ArgumentNullException("data"); 

      DataTable table = new DataTable(); 
      for (int i = 0; i < props.Count; i++) 
      { 
       PropertyDescriptor prop = props[i]; 
       table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); 
      } 
      object[] values = new object[props.Count]; 
      foreach (T item in data) 
      { 
       for (int i = 0; i < values.Length; i++) 
       { 
        values[i] = props[i].GetValue(item) ?? DBNull.Value; 
       } 
       table.Rows.Add(values); 
      } 
      return table; 
     } 


     private void LogMessages() 
     { 
      using (SqlConnection cn = new SqlConnection(this.logCnString)) 
      { 
       try 
       { 
        cn.Open(); 
        DataTable dt = ToDataTable(this.Logs); 

        Console.WriteLine(string.Format("Logging {0} records for Thread: {1}", this.Logs.Count, this.Name)); 

        using (SqlCommand cmd = new SqlCommand("Analysis.spInsertWorkItemRouteLog", cn)) 
        { 
         cmd.CommandType = System.Data.CommandType.StoredProcedure; 

         cmd.Parameters.AddWithValue("@dt", dt); 

         cmd.ExecuteNonQuery(); 
        } 

        Console.WriteLine(string.Format("Logged {0} records for Thread: {1}", this.Logs.Count, this.Name)); 
       } 
       finally 
       { 
        cn.Close(); 
       } 
      } 
     } 
    } 
} 
+0

我不明白你想要做什麼。當你說有「上SqlDataReader對象的GetValue爭」,你的意思是有鎖爭用?或者你的意思是GetValue在你的性能測試中花費的時間最長? –

+0

存在鎖爭用。 – Sean

+0

好的。爲什麼我們要獲取鎖以便使用SqlDataReader?爲什麼每個線程只有一個讀卡器? –

回答

-1

SqlDataAdapter或sqlDataReader之間的區別? 答:1.A DataReader的作品在連接環境, 而DataSet的工作在斷開連接的環境。 2.A數據集表示由任何數目的相互關聯的數據表對象中的數據的內存中緩存。 DataTable對象表示內存數據的表格塊。

SqlDataAdapter or sqlDataReader

0
1.A DataReader works in a connected environment, 
whereas DataSet works in a disconnected environment. 

2.A DataSet represents an in-memory cache of data consisting of any number of inter related DataTable objects. A DataTable object represents a tabular block of in-memory data. 

SqlDataAdapter or sqlDataReader