2010-04-06 114 views
1

我面臨兩難多線程情況下使用SqlBulkCopy的(!)。與線程池的問題

在第一場景中,我實現從一個數據的基礎上覆制數據的解決方案到另一個使用SqlBulkCopy的同步和我沒有問題的。

現在,使用ThreadPool,我實現了一個assynchronously情況下,每個表中的線程一樣的,一切工作正常,但過去一段時間(usualy1小時因爲複製操作需要大約同一時間),操作發送到ThreadPool停止執行。有一個diferent SQLBulkCopy使用每線程一個diferent SQLConnection

我已經看到的空閒線程數,他們是在調用開始全部免費。我有一個AutoResetEvent等待線程在再次啓動之前完成其工作,還有一個信號量FIFO用於保存活動線程的計數器。

有沒有辦法,我已經忘記了,或者使用SqlBulkCopy的時候,我應該avaliate一些問題?我很欣賞一些幫助,因爲我的想法了;)


- >使用

SemaphoreFIFO waitingThreads = new SemaphoreFIFO(); 
AutoResetEvent autoResetEvent = new AutoResetEvent(false); 
(...) 
List<TableMappingHolder> list = BulkCopy.Mapping(tables); 
waitingThreads.Put(list.Count, 300000); 

for (int i = 0; i < list.Count; i++){ 
    ThreadPool.QueueUserWorkItem(call => 
     //Replication 
     (...) 
     waitingThreads.Get(); 

     if (waitingThreads.Counter == 0) 
      autoResetEvent.Set(); 
    ); 
} 

bool finalized = finalized = autoResetEvent.WaitOne(300000); 
(...) 

//批量複製

public bool SetData(SqlDataReader reader, string _destinationTableName, List<SqlBulkCopyColumnMapping> _sqlBulkCopyColumnMappings) 
     { 
      using (SqlConnection destinationConnection = 
          new SqlConnection(ConfigurationManager.ConnectionStrings["dconn"].ToString())) 
      { 
       destinationConnection.Open(); 

       // Set up the bulk copy object. 
       // Note that the column positions in the source 
       // data reader match the column positions in 
       // the destination table so there is no need to 
       // map columns. 
       using (SqlBulkCopy bulkCopy = 
          new SqlBulkCopy(destinationConnection))     { 
        bulkCopy.BulkCopyTimeout = 300000; 
        bulkCopy.DestinationTableName = _destinationTableName; 

        // Set up the column mappings by name. 
        foreach (SqlBulkCopyColumnMapping columnMapping in _sqlBulkCopyColumnMappings) 
         bulkCopy.ColumnMappings.Add(columnMapping); 

        try{ 
         // Write from the source to the destination. 
         bulkCopy.WriteToServer(reader); 
        } 
        catch (Exception ex){return false;} 
        finally 
        { 
         try{reader.Close();} 
         catch (Exception e){//log} 
         try{bulkCopy.Close();} 
         catch (Exception e){//log} 
         try{destinationConnection.Close(); } 
         catch (Exception e){ //log } 
        } 
       } 
      } 
      return true; 
     } 

信號燈

public sealed class SemaphoreFIFO 
{ 
    private int _counter; 
    private readonly LinkedList<int> waitQueue = new LinkedList<int>(); 

    public int Counter 
    { 
     get { return _counter; } 
    } 

    private void internalNotify() 
    { 
     if (waitQueue.Count > 0 && _counter == 0) 
     { 
      Monitor.PulseAll(waitQueue); 
     } 
    } 

    public void Get() 
    { 
     lock (waitQueue) 
     { 
      _counter --; 
      internalNotify(); 
     } 
    } 

    public bool Put(int n, int timeout) 
    { 
     if (timeout < 0 && timeout != Timeout.Infinite) 
      throw new ArgumentOutOfRangeException("timeout"); 
     if (n < 0) 
      throw new ArgumentOutOfRangeException("n"); 

     lock (waitQueue) 
     { 
      if (waitQueue.Count == 0 && _counter ==0) 
      { 
       _counter +=n; 
       internalNotify(); 
       return true; 
      } 

      int endTime = Environment.TickCount + timeout; 
      LinkedListNode<int> me = waitQueue.AddLast(n); 
      try 
      { 
       while (true) 
       { 
        Monitor.Wait(waitQueue, timeout); 

        if (waitQueue.First == me && _counter ==0) 
        { 
         _counter += n; 
         waitQueue.RemoveFirst(); 
         internalNotify(); 
         return true; 
        } 

        if (timeout != Timeout.Infinite) 
        { 
         int remainingTime = endTime - Environment.TickCount; 
         if (remainingTime <= 0) 
         { 
          // TIMEOUT 
          if (waitQueue.First == me) 
          { 
           waitQueue.RemoveFirst(); 
           internalNotify(); 
          } 
          else 
           waitQueue.Remove(me); 
          return false; 
         } 
         timeout = remainingTime; 
        } 
       } 
      } 
      catch (ThreadInterruptedException e) 
      { 
       // INTERRUPT 
       if (waitQueue.First == me) 
       { 
        waitQueue.RemoveFirst(); 
        internalNotify(); 
       } 
       else 
        waitQueue.Remove(me); 
       throw e; 
      } 
     } 
    } 
} 

回答

0

我只想回到同步使用SQLBulkCopy。我不確定你在同一時間做了一堆批量拷貝(而不是一個接一個地獲得)。它可以完成一切更快一點,但我甚至不確定這一點。

+0

在我的生產環境中它是同步的,但我想要的是所有表中的數據幾乎在同一時間寫入,因爲它將被另一個應用程序使用 – Soulbe 2010-04-06 17:52:50

+1

我的猜測是你不會獲得太多(如果有的話)從這種方法。我認爲如果你同時運行兩個批量拷貝,每個拷貝的運行速度大約是單獨運行的一半,所以淨完成時間將是相同的。我從來沒有聽說過這個正在做的(多個同時批量插入),並且它可能是你的線程停止執行,因爲你正在運行到一個基本的限制如何可以批量插入可以在同一時間內完成。 – MusiGenesis 2010-04-06 18:51:34

+0

感謝您的評論。事實上,如果我評論批量插入並讓其他數據訪問,應用程序工作正常! – Soulbe 2010-04-07 08:47:04