2

下面是場景: 我想從服務器的數據表插入到服務器B.多線程SqlDataReader的過期時間

的數據是非常巨大的,所以我會用SqlBulkCopy的做的工作。

這裏是我的想法:

製片人:讓所有頁碼

消費:從服務器A獲得頁面數,頁面數據並寫入 DataReader的服務器

問題我遇到過:

當運行時間很長時,例外情況如下:

Timeout expired. The timeout period elapsed prior to completion of the operation 


[SqlException (0x80131904): Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding.] System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection) +1948826 
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection) +4844747 
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj) +194 
System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj) +2392 
System.Data.SqlClient.SqlDataReader.ConsumeMetaData() +33 
System.Data.SqlClient.SqlDataReader.get_MetaData() +83 
System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString) +297 
System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async) +954 
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, DbAsyncResult result) +162 
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method) +32 
System.Data.SqlClient.SqlCommand.ExecuteReader(CommandBehavior behavior, String method) +141 
System.Data.SqlClient.SqlCommand.ExecuteDbDataReader(CommandBehavior behavior) +12 
System.Data.Common.DbCommand.ExecuteReader() +12 
System.Data.Linq.SqlClient.SqlProvider.Execute(Expression query, QueryInfo queryInfo, IObjectReaderFactory factory, Object[] parentArgs, Object[] userArgs, ICompiledSubQuery[] subQueries, Object lastResult) +332 

我找不到任何錯誤的代碼。

這裏是我的代碼:

的Program.cs

class Program 
{ 
    static BlockingCollection<int> pcCollection = new BlockingCollection<int>(); 
    static string strRemoteConn = ConfigurationManager.AppSettings["RemoteConnStr"]; 
    static string strLocalConn = ConfigurationManager.AppSettings["LocalConnStr"]; 
    static string strCommandSql = ConfigurationManager.AppSettings["CommandSQL"]; 
    static string strTableName = ConfigurationManager.AppSettings["TableName"]; 
    static int batchSize = Int32.Parse(ConfigurationManager.AppSettings["CommitBatchSize"]); 
    static int taskCount = Int32.Parse(ConfigurationManager.AppSettings["TaskCount"]); 

    static object s_consumer = new object(); 

    static void Main(string[] args) 
    { 
     try 
     { 
      var watch = Stopwatch.StartNew(); 

      var tableCount = 0D; 

      using (var connection = new SqlConnection(strRemoteConn)) 
      using (SqlCommand cmd = connection.CreateCommand()) 
      { 
       connection.Open(); 
       cmd.CommandText = string.Format(@"SELECT 
                Total_Rows= SUM(st.row_count) 
               FROM 
                sys.dm_db_partition_stats st 
               WHERE 
                object_name(object_id) = '{0}' AND (index_id < 2)", strTableName); 
       cmd.CommandTimeout = 300; 

       tableCount = Double.Parse(cmd.ExecuteScalar().ToString()); 
      } 

      var totalPages = (int)Math.Ceiling(tableCount/batchSize); 

      var listPageRn = Enumerable.Range(1, totalPages); 

      var listPartPage = listPageRn.Split(taskCount).ToList(); 

      var listProducerTask = new List<Task>(); 
      var listConsumerTask = new List<Task>(); 

      var consumerTask = taskCount; 

      for (int i = 1; i <= consumerTask; i++) 
      { 
       var taskFlag = i; 
       var consumer = Task.Factory.StartNew(() => 
       { 
        ConsumerAction(taskFlag.ToString()); 

       }, TaskCreationOptions.LongRunning); 

       listConsumerTask.Add(consumer); 
      } 

      var producerTaskIndex = 1; 
      foreach (var item in listPartPage) 
      { 
       var tmpIndex = producerTaskIndex.ToString(); 
       var producer = Task.Factory.StartNew(() => 
       { 
        ProducerAction(item, tmpIndex); 
       }); 

       listProducerTask.Add(producer); 
       producerTaskIndex++; 
      } 

      Task.WaitAll(listProducerTask.ToArray()); 
      pcCollection.CompleteAdding(); 
      Task.WaitAll(listConsumerTask.ToArray()); 

      watch.Stop(); 
      var mins = watch.ElapsedMilliseconds/1000/60; 
      Console.WriteLine("All Batch Insert Time Elapsed:\t {0} mins", mins); 
     } 
     catch (AggregateException ex) 
     { 
      using (StreamWriter writer = File.AppendText("BatchError.txt")) 
      { 
       writer.WriteLine("Error Time: {0}", DateTime.Now); 
       foreach (var exception in ex.InnerExceptions) 
       { 
        writer.WriteLine("Error: {0}", exception.Message); 
        writer.WriteLine("Source: {0}", exception.Source); 
        writer.WriteLine("Track: {0}", exception.StackTrace); 
       } 
      } 
      throw; 

     } 
     catch (Exception ex) 
     { 
      using (StreamWriter writer = File.AppendText("BatchError.txt")) 
      { 
       writer.WriteLine("Error Time: {0}", DateTime.Now); 
       writer.WriteLine("Error: {0}", ex.Message); 
       writer.WriteLine("Source: {0}", ex.Source); 
       writer.WriteLine("Track: {0}", ex.StackTrace); 
      } 
      throw; 
     } 

     Console.ReadLine(); 
    } 

    static void ProducerAction(IEnumerable<int> source, string taskFlag = "1") 
    { 
     foreach (var item in source) 
     { 
      Console.WriteLine("Producer-{0} processing item batch {1}", taskFlag, item); 

      pcCollection.Add(item); 
     } 
    } 

    static void ConsumerAction(string taskFlag = "") 
    { 
     foreach (var item in pcCollection.GetConsumingEnumerable()) 
     { 
      Console.WriteLine("consumer-{0} processing item", taskFlag); 
      var processing = new ManageBatchProcessing 
      { 
       LocalConnStr = strLocalConn, 
       RemoteConnStr = strRemoteConn, 
       BatchSize = batchSize, 
       TableName = strTableName, 
       CommandSql = strCommandSql 
      }; 

      processing.ProcessDatabase(item); 
     } 
    } 

ManageBatchProcessing.cs

public class ManageBatchProcessing 
{ 
    public string LocalConnStr { get; set; } 
    public string RemoteConnStr { get; set; } 

    public string CommandSql { get; set; } 
    public int BatchSize { get; set; } 
    public string TableName { get; set; } 

    public void ProcessDatabase(int item) 
    { 
     var watch = new Stopwatch(); 
     watch.Start(); 

     var start = (item - 1) * this.BatchSize + 1; 
     var end = item * this.BatchSize; 
     var strCommandSql = string.Format(this.CommandSql, start, end); 
     using (var remoteConn = new SqlConnection(this.RemoteConnStr)) 
     using (var localConn = new SqlConnection(this.LocalConnStr)) 
     { 
      remoteConn.Open(); 
      localConn.Open(); 

      using (var command = new SqlCommand(strCommandSql, remoteConn)) 
      using (var dataReader = command.ExecuteReader()) 
      { 
       command.CommandTimeout = 0; 
       using (var bulkCopy = new SqlBulkCopy(localConn)) 
       { 
        bulkCopy.DestinationTableName = this.TableName; 
        bulkCopy.BulkCopyTimeout = 0; 
        bulkCopy.WriteToServer(dataReader); 
        bulkCopy.Close(); 
       } 
      } 

      remoteConn.Close(); 
      localConn.Close(); 
     } 

     watch.Stop(); 

     var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
     Console.WriteLine("\t\t\t -------------------------------------------------"); 
     Console.ForegroundColor = ConsoleColor.Green; 
     Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##")); 
     Console.ResetColor(); 
     Console.WriteLine("\t\t\t -------------------------------------------------"); 
    } 

    public DataTable RetriveToDatabase(int item) 
    { 
     var start = (item - 1) * this.BatchSize + 1; 
     var end = item * this.BatchSize; 
     var dataTable = new DataTable(); 

     using (var connection = new SqlConnection(this.RemoteConnStr)) 
     { 
      var watch = new Stopwatch(); 
      watch.Start(); 

      using (var adapter = new SqlDataAdapter(string.Format(this.CommandSql, start, end), connection)) 
      { 
       adapter.SelectCommand.CommandTimeout = 3600; 
       adapter.Fill(dataTable); 
      } 

      watch.Stop(); 

      var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      Console.ForegroundColor = ConsoleColor.Red; 
      Console.WriteLine("\t\t\t convert datareader to table done {0} s", totalSeconds.ToString("#.##")); 
      Console.ResetColor(); 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      return dataTable; 
     } 
    } 

    public void WriteToDatabase(IDataReader reader) 
    { 
     using (var connection = new SqlConnection(this.LocalConnStr)) 
     { 
      var watch = new Stopwatch(); 
      watch.Start(); 

      connection.Open(); 

      using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null)) 
      { 
       bulkCopy.DestinationTableName = this.TableName; 
       bulkCopy.BulkCopyTimeout = 0; 
       bulkCopy.WriteToServer(reader); 
      } 

      connection.Close(); 

      watch.Stop(); 

      var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      Console.ForegroundColor = ConsoleColor.Green; 
      Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##")); 
      Console.ResetColor(); 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
     } 
    } 

    public void WriteToDatabase(DataTable dataTable) 
    { 
     using (var connection = new SqlConnection(this.LocalConnStr)) 
     { 
      var watch = new Stopwatch(); 
      watch.Start(); 

      connection.Open(); 

      using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null)) 
      { 
       bulkCopy.DestinationTableName = this.TableName; 
       bulkCopy.BulkCopyTimeout = 0; 
       bulkCopy.WriteToServer(dataTable); 
      } 

      connection.Close(); 

      watch.Stop(); 

      var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      Console.ForegroundColor = ConsoleColor.Green; 
      Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##")); 
      Console.ResetColor(); 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
     } 
    } 

} 
+0

你們是不是要執行[複製](https://msdn.microsoft.com/en-us/library/ms151198.aspx)? – Prisoner

+0

你可以這麼說。但數據至少有一千萬或億。 –

+0

如果複製可以適合,請嘗試使用複製而不是手動工作。 – Prisoner

回答

0

我終於知道什麼是錯我的代碼。

command.CommandTimeout = 0; 

是錯誤的地方。

這裏是正確的版本:

using (var command = new SqlCommand(strCommandSql, remoteConn)) 
      { 
       command.CommandTimeout = 0; 

       using (var dataReader = command.ExecuteReader()) 
       { 
        using (var bulkCopy = new SqlBulkCopy(this.LocalConnStr, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.UseInternalTransaction)) 
        { 
         bulkCopy.DestinationTableName = this.TableName; 
         bulkCopy.BulkCopyTimeout = 0; 
         bulkCopy.WriteToServer(dataReader); 
         bulkCopy.Close(); 
        } 
       } 
      }