2011-08-20 172 views
5

我有一個http處理程序,並將每個請求存儲到內存中的併發隊列集合中。經過一段時間後,我將集合批量插入數據庫。將數據保存在內存中

這是一個壞主意嗎?因爲海量數量很大,所以這似乎是IMO的一個更好的方法。

由於線程的緣故,我確實看到了一些差異(數據庫中的命中數與存儲元素的數量),同時我刷新併發集合,我鎖定並批量插入其內容,然後清空集合。然後從集合中刪除鎖。

有沒有更好的做法?還是你做過類似的事情?

+0

你爲什麼想要這樣做? – Marc

+0

,因爲數據訪問速度很慢,如果我一次插入或執行一個事務。 – DarthVader

+0

對於面臨的線程問題,爲什麼不鎖定集合,將原始集合複製到不同的集合(可以說newCollection),清除並移除原集合上的鎖定,並使用newCollection插入數據庫。採用這種方法,新的請求不會長時間被阻止。 –

回答

1

我已經做了幾乎完全一樣的事情,你用下面的代碼描述。它的線程安全,並有一個刷新方法,你可以調用刷新和掛起寫入。一旦達到要寫入的閾值數量的對象,就會將隊列(我的例子中的列表)發送到另一個線程進行保存。請注意,它使用一個manualResetEvent來處理結尾處的數據刷新(可以等待64個重置事件的限制,所以這就是爲什麼手動等待如果我們有超過64個後臺線程等待寫入,但應該幾乎永遠不會發生,除非你的數據庫真的很慢)。這段代碼被用來處理流入它的數以百萬計的記錄(從內存開始,它花費了大約5分鐘來編寫20m行,但是作爲數據庫在保存服務器上運行,所以沒有網絡跳躍...... SQL當然可以處理數千個的行使用BulkSqlCopy對象和IDataReader),所以它應該處理你的請求加載(但當然這取決於你正在編寫的和你的數據庫,但我認爲代碼是完成任務!)。

另外,爲了便於批量編寫,我創建了一個IDataReader的最小實現來傳輸數據。您需要爲您的請求使用下面的代碼。

public class DataImporter<T> 
{ 

    public DataImporter(string tableName, string readerName) 
    { 
     _tableName = tableName; 
     _readerName = readerName; 
    } 

    /// <summary> 
    /// This is the size of our bulk staging list. 
    /// </summary> 
    /// <remarks> 
    /// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value, 
    /// so records may not be going into the database in sizes of this staging value. 
    /// </remarks> 
    private int _bulkStagingListSize = 20000; 
    private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>(); 
    private string _tableName = String.Empty; 
    private string _readerName = String.Empty; 

    public void QueueForImport(T record) 
    { 
     lock (_listLock) 
     { 
      _items.Add(record); 
      if (_items.Count > _bulkStagingListSize) 
      { 
       SaveItems(_items); 
       _items = new List<T>(); 
      } 
     } 
    } 

    /// <summary> 
    /// This method should be called at the end of the queueing work to ensure to clear down our list 
    /// </summary> 
    public void Flush() 
    { 
     lock (_listLock) 
     { 
      SaveItems(_items); 
      _items = new List<T>(); 
      while (_tasksWaiting.Count > 64) 
      { 
       Thread.Sleep(2000); 
      } 
      WaitHandle.WaitAll(_tasksWaiting.ToArray()); 
     } 
    } 

    private void SaveItems(List<T> items) 
    { 
     ManualResetEvent evt = new ManualResetEvent(false); 
     _tasksWaiting.Add(evt); 
     IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items); 
     Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader); 
     ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo); 

    } 

    private void saveData(object info) 
    { 
     using (new ActivityTimer("Saving bulk data to " + _tableName)) 
     { 
      Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>; 
      IDataReader r = stateInfo.Item2; 
      try 
      { 
       Database.DataImportStagingDatabase.BulkLoadData(r, _tableName); 
      } 
      catch (Exception ex) 
      { 
       //Do something 
      } 
      finally 
      { 
       _tasksWaiting.Remove(stateInfo.Item1); 
       stateInfo.Item1.Set(); 
      } 
     } 
    } 

    private object _listLock = new object(); 

    private List<T> _items = new List<T>(); 
} 

的DataReaderFactory refered下面只是選擇正確的IDataReader implmentation以用於流,看起來如下:

internal static class DataReaderFactory 
{ 
    internal static IDataReader GetReader<T>(string typeName, List<T> items) 
    { 
     IDataReader reader = null; 
     switch(typeName) 
     { 
      case "ProductRecordDataReader": 
       reader = new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader; 
       break; 
      case "RetailerPriceRecordDataReader": 
       reader = new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader; 
       break; 
      default: 
       break; 
     } 
     return reader; 
    } 
} 

,我在這種情況下使用(althoght這段代碼的數據讀取器的實現將工作與任何數據讀取器)如下:

/// <summary> 
/// This class creates a data reader for ProductRecord data. This is used to stream the records 
/// to the SqlBulkCopy object. 
/// </summary> 
public class ProductRecordDataReader:IDataReader 
{ 
    public ProductRecordDataReader(List<ProductRecord> products) 
    { 
     _products = products.ToList(); 
    } 

    List<ProductRecord> _products; 

    int currentRow; 
    int rowCounter = 0; 
    public int FieldCount 
    { 
     get 
     { 
      return 14; 
     } 
    } 


    #region IDataReader Members 

    public void Close() 
    { 
     //Do nothing. 
    } 

    public bool Read() 
    { 
     if (rowCounter < _products.Count) 
     { 
      currentRow = rowCounter; 
      rowCounter++; 
      return true; 
     } 
     else 
     { 
      return false; 
     } 

    } 

    public int RecordsAffected 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public string GetName(int i) 
    { 
     switch (i) 
     { 
      case 0: 
       return "ProductSku"; 
      case 1: 
       return "UPC"; 
      case 2: 
       return "EAN"; 
      case 3: 
       return "ISBN"; 
      case 4: 
       return "ProductName"; 
      case 5: 
       return "ShortDescription"; 
      case 6: 
       return "LongDescription"; 
      case 7: 
       return "DFFCategoryNumber"; 
      case 8: 
       return "DFFManufacturerNumber"; 
      case 9: 
       return "ManufacturerPartNumber"; 
      case 10: 
       return "ManufacturerModelNumber"; 
      case 11: 
       return "ProductImageUrl"; 
      case 12: 
       return "LowestPrice"; 
      case 13: 
       return "HighestPrice"; 
      default: 
       return null; 
     } 

    } 

    public int GetOrdinal(string name) 
    { 
     switch (name) 
     { 
      case "ProductSku": 
       return 0; 
      case "UPC": 
       return 1; 
      case "EAN": 
       return 2; 
      case "ISBN": 
       return 3; 
      case "ProductName": 
       return 4; 
      case "ShortDescription": 
       return 5; 
      case "LongDescription": 
       return 6; 
      case "DFFCategoryNumber": 
       return 7; 
      case "DFFManufacturerNumber": 
       return 8; 
      case "ManufacturerPartNumber": 
       return 9; 
      case "ManufacturerModelNumber": 
       return 10; 
      case "ProductImageUrl": 
       return 11; 
      case "LowestPrice": 
       return 12; 
      case "HighestPrice": 
       return 13; 
      default: 
       return -1; 
     } 

    } 

    public object GetValue(int i) 
    { 
     switch (i) 
     { 
      case 0: 
       return _products[currentRow].ProductSku; 
      case 1: 
       return _products[currentRow].UPC; 
      case 2: 
       return _products[currentRow].EAN; 
      case 3: 
       return _products[currentRow].ISBN; 
      case 4: 
       return _products[currentRow].ProductName; 
      case 5: 
       return _products[currentRow].ShortDescription; 
      case 6: 
       return _products[currentRow].LongDescription; 
      case 7: 
       return _products[currentRow].DFFCategoryNumber; 
      case 8: 
       return _products[currentRow].DFFManufacturerNumber; 
      case 9: 
       return _products[currentRow].ManufacturerPartNumber; 
      case 10: 
       return _products[currentRow].ManufacturerModelNumber; 
      case 11: 
       return _products[currentRow].ProductImageUrl; 
      case 12: 
       return _products[currentRow].LowestPrice; 
      case 13: 
       return _products[currentRow].HighestPrice; 
      default: 
       return null; 
     } 

    } 

    #endregion 

    #region IDisposable Members 

    public void Dispose() 
    { 
     //Do nothing; 
    } 

    #endregion 

    #region IDataRecord Members 

    public bool NextResult() 
    { 
     throw new NotImplementedException(); 
    } 

    public int Depth 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public DataTable GetSchemaTable() 
    { 
     throw new NotImplementedException(); 
    } 

    public bool IsClosed 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public bool GetBoolean(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public byte GetByte(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) 
    { 
     throw new NotImplementedException(); 
    } 

    public char GetChar(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) 
    { 
     throw new NotImplementedException(); 
    } 

    public IDataReader GetData(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public string GetDataTypeName(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public DateTime GetDateTime(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public decimal GetDecimal(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public double GetDouble(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public Type GetFieldType(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public float GetFloat(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public Guid GetGuid(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public short GetInt16(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public int GetInt32(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetInt64(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public string GetString(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public int GetValues(object[] values) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool IsDBNull(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public object this[string name] 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public object this[int i] 
    { 
     get { throw new NotImplementedException(); } 
    } 

    #endregion 
} 

最後批量加載數據的方法如下所示:

public void BulkLoadData(IDataReader reader, string tableName) 
    { 
     using (SqlConnection cnn = new SqlConnection(cnnString)) 
     { 
      SqlBulkCopy copy = new SqlBulkCopy(cnn); 
      copy.DestinationTableName = tableName; 
      copy.BatchSize = 10000; 
      cnn.Open(); 
      copy.WriteToServer(reader); 
     } 
    } 

但是,說了這麼多,我建議你不要在asp.net中使用這段代碼,因爲有人在另一個回答(特別是IIS中的工作進程的回收)中指出了這個原因。我建議你使用一個非常輕的隊列首先發送請求數據到另一個不會重新啓動的服務(我們使用ZeroMQ來將請求和日誌數據從ASP.NET應用程序中寫出來,非常高效)。

Mike。

1

我看到一些差異[...]由於線程

這裏的基本就是用2個隊列和循環他們。 1個用於接收和1個用於插入。你只需要鎖定接收,幾乎沒有爭用。

+0

我使用一個隊列接收比我轉儲列表,我可以做批量插入。我使用併發隊列,所以我不需要鎖定。我只在將內容轉儲列表時鎖定隊列,然後批量插入並刪除鎖定。你怎麼看?我可以做得更好嗎? – DarthVader

+0

好吧,複製到列表比db插入更快,但仍然有2個隊列可能會更好。這取決於。併發隊列將使切換有點棘手。 –

+0

ConcurrentQueue存在一個問題,你是否讓它變得非常大?見http://connect.microsoft.com/VisualStudio/feedback/details/552868/memory-leak-in-concurrentqueue-t-class-dequeued-enteries-are-still-rooted –

2

對不起,但我會說這是一個壞主意。主要有以下問題:

  • 如果應用程序池回收的數據寫入到數據庫之前被插入數據時,你將失去數據
  • 保持所有數據的相同集合中導致需要鎖定該集合並且當數據被寫入磁盤並且集合被清除時。這可能會導致整個站點在批量插入過程中暫停。
  • 隨着額外的步驟,您的代碼將更加複雜。修復線程問題很難

我們已經編寫了Web應用程序,它在峯值負載下每秒向SQL Server數據庫寫入1000行數據。

嘗試儘可能簡單地寫入您的應用程序,然後進行性能測試。

上,你可以插入到數據庫中很多依賴於你的硬件,但也有東西,你可以在你的程序做的速度:

  • 只對表中的一個索引(clustered)。主要自動編號。
  • 請確保您儘快釋放到數據庫的連接。
+0

我爲了解你的要點和我的上述鎖定。你是怎麼每秒向sql server寫入1000行的?你可以做每秒10000行sql服務器? – DarthVader

0

你可以做的其他事情發送到磁盤上的數據庫像sqlite(以避免池recicle問題),並將其發送到您的sql服務器數據庫。

我使用了反應擴展來創建插入隊列並以很好的速度工作。