我們注意到,我們的.NET應用程序的內部,我們有競爭,當涉及到使用SqlDataReader的。雖然我們知道SqlDataReader不是ThreadSafe,但它應該縮放。下面的代碼是一個簡單的例子來表明,由於存在對SqlDataReader的GetValue方法競爭,我們不能擴展我們的應用程序。我們不受CPU,磁盤或網絡的限制;只是SqlDataReader的內部爭用。我們可以使用1個線程運行應用程序10次,它可以線性縮放,但1個應用程序中的10個線程不會縮放。有關如何在單個c#應用程序中擴展從SQL Server讀取的任何想法?c。與SqlDataReaders和SqlDataAdapters#線程爭
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using System.Globalization;
namespace ThreadAndSQLTester
{
class Host
{
/// <summary>
/// Gets or sets the receive workers.
/// </summary>
/// <value>The receive workers.</value>
internal List<Worker> Workers { get; set; }
/// <summary>
/// Gets or sets the receive threads.
/// </summary>
/// <value>The receive threads.</value>
internal List<Thread> Threads { get; set; }
public int NumberOfThreads { get; set; }
public int Sleep { get; set; }
public int MinutesToRun { get; set; }
public bool IsRunning { get; set; }
private System.Timers.Timer runTime;
private object lockVar = new object();
public Host()
{
Init(1, 0, 0);
}
public Host(int numberOfThreads, int sleep, int minutesToRun)
{
Init(numberOfThreads, sleep, minutesToRun);
}
private void Init(int numberOfThreads, int sleep, int minutesToRun)
{
this.Workers = new List<Worker>();
this.Threads = new List<Thread>();
this.NumberOfThreads = numberOfThreads;
this.Sleep = sleep;
this.MinutesToRun = minutesToRun;
SetUpTimer();
}
private void SetUpTimer()
{
if (this.MinutesToRun > 0)
{
this.runTime = new System.Timers.Timer();
this.runTime.Interval = TimeSpan.FromMinutes(this.MinutesToRun).TotalMilliseconds;
this.runTime.Elapsed += new System.Timers.ElapsedEventHandler(runTime_Elapsed);
this.runTime.Start();
}
}
void runTime_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
this.runTime.Stop();
this.Stop();
this.IsRunning = false;
}
public void Start()
{
this.IsRunning = true;
Random r = new Random(DateTime.Now.Millisecond);
for (int i = 0; i < this.NumberOfThreads; i++)
{
string threadPoolId = Math.Ceiling(r.NextDouble() * 10).ToString();
Worker worker = new Worker("-" + threadPoolId); //i.ToString());
worker.Sleep = this.Sleep;
this.Workers.Add(worker);
Thread thread = new Thread(worker.Work);
worker.Name = string.Format("WorkerThread-{0}", i);
thread.Name = worker.Name;
this.Threads.Add(thread);
thread.Start();
Debug.WriteLine(string.Format(CultureInfo.InvariantCulture, "Started new Worker Thread. Total active: {0}", i + 1));
}
}
public void Stop()
{
if (this.Workers != null)
{
lock (lockVar)
{
for (int i = 0; i < this.Workers.Count; i++)
{
//Thread thread = this.Threads[i];
//thread.Interrupt();
this.Workers[i].IsEnabled = false;
}
for (int i = this.Workers.Count - 1; i >= 0; i--)
{
Worker worker = this.Workers[i];
while (worker.IsRunning)
{
Thread.Sleep(32);
}
}
foreach (Thread thread in this.Threads)
{
thread.Abort();
}
this.Workers.Clear();
this.Threads.Clear();
}
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Data;
using System.Threading;
using System.ComponentModel;
using System.Data.OleDb;
namespace ThreadAndSQLTester
{
class Worker
{
public bool IsEnabled { get; set; }
public bool IsRunning { get; set; }
public string Name { get; set; }
public int Sleep { get; set; }
private string dataCnString { get; set; }
private string logCnString { get; set; }
private List<Log> Logs { get; set; }
public Worker(string threadPoolId)
{
this.Logs = new List<Log>();
SqlConnectionStringBuilder cnBldr = new SqlConnectionStringBuilder();
cnBldr.DataSource = @"trgcrmqa3";
cnBldr.InitialCatalog = "Scratch";
cnBldr.IntegratedSecurity = true;
cnBldr.MultipleActiveResultSets = true;
cnBldr.Pooling = true;
dataCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), threadPoolId);
cnBldr = new SqlConnectionStringBuilder();
cnBldr.DataSource = @"trgcrmqa3";
cnBldr.InitialCatalog = "Scratch";
cnBldr.IntegratedSecurity = true;
logCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), string.Empty);
IsEnabled = true;
}
private string machineName { get; set; }
private string GetConnectionStringWithWorkStationId(string connectionString, string connectionPoolToken)
{
if (string.IsNullOrEmpty(machineName)) machineName = Environment.MachineName;
SqlConnectionStringBuilder cnbdlr;
try
{
cnbdlr = new SqlConnectionStringBuilder(connectionString);
}
catch
{
throw new ArgumentException("connection string was an invalid format");
}
cnbdlr.WorkstationID = machineName + connectionPoolToken;
return cnbdlr.ConnectionString;
}
public void Work()
{
int i = 0;
while (this.IsEnabled)
{
this.IsRunning = true;
try
{
Log log = new Log();
log.WorkItemId = Guid.NewGuid();
log.StartTime = DateTime.Now;
List<object> lst = new List<object>();
using (SqlConnection cn = new SqlConnection(this.dataCnString))
{
try
{
cn.Open();
using (SqlCommand cmd = new SqlCommand("Analysis.spSelectTestData", cn))
{
cmd.CommandType = System.Data.CommandType.StoredProcedure;
using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) // DBHelper.ExecuteReader(cn, cmd))
{
while (dr.Read())
{
CreateClaimHeader2(dr, lst);
}
dr.Close();
}
cmd.Cancel();
}
}
catch { }
finally
{
cn.Close();
}
}
log.StopTime = DateTime.Now;
log.RouteName = this.Name;
log.HostName = this.machineName;
this.Logs.Add(log);
i++;
if (i > 1000)
{
Console.WriteLine(string.Format("Thread: {0} executed {1} items.", this.Name, i));
i = 0;
}
if (this.Sleep > 0) Thread.Sleep(this.Sleep);
}
catch { }
}
this.LogMessages();
this.IsRunning = false;
}
private void CreateClaimHeader2(IDataReader reader, List<object> lst)
{
lst.Add(reader["ClaimHeaderID"]);
lst.Add(reader["ClientCode"]);
lst.Add(reader["MemberID"]);
lst.Add(reader["ProviderID"]);
lst.Add(reader["ClaimNumber"]);
lst.Add(reader["PatientAcctNumber"]);
lst.Add(reader["Source"]);
lst.Add(reader["SourceID"]);
lst.Add(reader["TotalPayAmount"]);
lst.Add(reader["TotalBillAmount"]);
lst.Add(reader["FirstDateOfService"]);
lst.Add(reader["LastDateOfService"]);
lst.Add(reader["MaxStartDateOfService"]);
lst.Add(reader["MaxValidStartDateOfService"]);
lst.Add(reader["LastUpdated"]);
lst.Add(reader["UpdatedBy"]);
}
/// <summary>
/// Toes the data table.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data">The data.</param>
/// <returns></returns>
public DataTable ToDataTable<T>(IEnumerable<T> data)
{
PropertyDescriptorCollection props =
TypeDescriptor.GetProperties(typeof(T));
if (props == null) throw new ArgumentNullException("Table properties.");
if (data == null) throw new ArgumentNullException("data");
DataTable table = new DataTable();
for (int i = 0; i < props.Count; i++)
{
PropertyDescriptor prop = props[i];
table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
}
object[] values = new object[props.Count];
foreach (T item in data)
{
for (int i = 0; i < values.Length; i++)
{
values[i] = props[i].GetValue(item) ?? DBNull.Value;
}
table.Rows.Add(values);
}
return table;
}
private void LogMessages()
{
using (SqlConnection cn = new SqlConnection(this.logCnString))
{
try
{
cn.Open();
DataTable dt = ToDataTable(this.Logs);
Console.WriteLine(string.Format("Logging {0} records for Thread: {1}", this.Logs.Count, this.Name));
using (SqlCommand cmd = new SqlCommand("Analysis.spInsertWorkItemRouteLog", cn))
{
cmd.CommandType = System.Data.CommandType.StoredProcedure;
cmd.Parameters.AddWithValue("@dt", dt);
cmd.ExecuteNonQuery();
}
Console.WriteLine(string.Format("Logged {0} records for Thread: {1}", this.Logs.Count, this.Name));
}
finally
{
cn.Close();
}
}
}
}
}
我不明白你想要做什麼。當你說有「上SqlDataReader對象的GetValue爭」,你的意思是有鎖爭用?或者你的意思是GetValue在你的性能測試中花費的時間最長? –
存在鎖爭用。 – Sean
好的。爲什麼我們要獲取鎖以便使用SqlDataReader?爲什麼每個線程只有一個讀卡器? –