2

我需要實現可以從多個線程填充請求的隊列。當這個隊列變得大於1000個完成的請求時,這個請求應該被存儲到數據庫中。這是我的實現:如何從ConcurrentQueue消耗chuncks正確

public class RequestQueue 
{ 
    private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>(); 
    private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>(); 

    private static volatile bool isLoading = false; 
    private static object _lock = new object(); 

    public static void Launch() 
    { 
     Task.Factory.StartNew(execute); 
    } 

    public static void Add(VerificationRequest request) 
    { 
     _queue.Add(request); 
    } 

    public static void AddRange(List<VerificationRequest> requests) 
    { 
     Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3}, 
      (request) => { _queue.Add(request); }); 
    } 


    private static void execute() 
    { 
     Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest); 
    } 

    private static void EnqueueSaveRequest(VerificationRequest request) 
    { 
     _storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request)); 
     if (_storageQueue.Count > 1000 && !isLoading) 
     { 
      lock (_lock) 
      { 
       if (_storageQueue.Count > 1000 && !isLoading) 
       { 
        isLoading = true; 

        var requestChunck = new List<VerificationRequest>(); 
        VerificationRequest req; 
        for (var i = 0; i < 1000; i++) 
        { 
         if(_storageQueue.TryDequeue(out req)) 
          requestChunck.Add(req); 
        } 
        new VerificationRequestRepository().InsertRange(requestChunck); 

        isLoading = false; 
       } 
      } 
     }    
    } 
} 

有沒有什麼辦法來實現這個沒有鎖和isLoading?

+0

爲什麼你不希望使用鎖?我的意思是在這種情況下似乎不會影響性能。 – Evk

+0

我同意,但也許有更好的辦法。另外我不確定我是否實現了正確加載isLoading的鎖定 – xalz

+0

爲什麼你甚至需要'isLoading'?如果您只是將其刪除,會發生什麼變化? – zerkms

回答

3

做你問什麼,最簡單的方法是在TPL Dataflow庫使用的塊。例如

var batchBlock = new BatchBlock<VerificationRequest>(1000); 
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{ 
       new VerificationRequestRepository().InsertRange(records); 
}; 

batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true }); 

就是這樣。

您可以將消息發送到起始塊與

batchBlock.Post(new VerificationRequest(...)); 

一旦你完成你的工作,你可以取下來的整條管線,並通過調用batchBlock.Complete();刷新任何剩餘的消息,並等待進行最後的塊來完成:

batchBlock.Complete(); 
await exportBlock.Completion; 

BatchBlock批高達1000所記錄到的1000個項目陣列,並將它們傳遞到下一個塊。一個ActionBlock默認只使用1個任務,所以它是線程安全的。你可以使用你的資料庫的現有實例,而不必擔心跨線程訪問:

var repository=new VerificationRequestRepository(); 
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{ 
       repository.InsertRange(records); 
}; 

幾乎所有的塊具有並行輸入緩衝區。每個塊都運行在自己的TPL任務上,因此每個步驟都可以同時運行。這意味着你異步執行「免費」,如果你有多個鏈接的步驟也很重要,比如你用TransformBlock修改流經管道的消息。

我使用這種管道創建調用外部服務管道,解析反應,生成最終的記錄,批,並結合使用SqlBulkCopy的塊發送到數據庫。