我需要實現可以從多個線程填充請求的隊列。當這個隊列變得大於1000個完成的請求時,這個請求應該被存儲到數據庫中。這是我的實現:如何從ConcurrentQueue消耗chuncks正確
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest);
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request));
if (_storageQueue.Count > 1000 && !isLoading)
{
lock (_lock)
{
if (_storageQueue.Count > 1000 && !isLoading)
{
isLoading = true;
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if(_storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}
有沒有什麼辦法來實現這個沒有鎖和isLoading?
爲什麼你不希望使用鎖?我的意思是在這種情況下似乎不會影響性能。 – Evk
我同意,但也許有更好的辦法。另外我不確定我是否實現了正確加載isLoading的鎖定 – xalz
爲什麼你甚至需要'isLoading'?如果您只是將其刪除,會發生什麼變化? – zerkms