我有一個ConcurrentQueue從一個線程獲取對象,另一個線程從中獲取對象並處理它們。帶有小開銷或無鎖的鎖
如果隊列變大,我可以通過刪除重複項來「壓縮」它。壓縮採用隊列並將其放入一個列表中,遍歷它並創建一個只有不同值的新隊列。所以我替換隊列,並且因爲我這樣做,我不能將對象插入到被覆蓋的隊列中,我會將它們釋放。
我的問題是,如果我添加一個鎖(OBJ){}或某種LockHandle我鬆散很多的性能。有很多交易,但處理時間非常短,所以鎖定看起來像是什麼在阻止我的表現。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
namespace A
{
public abstract class Base
{
private ConcurrentQueue<Data> unProcessed = new ConcurrentQueue<Data>();
private const int MIN_COLLAPSETIME = 30;
private const int MIN_COLLAPSECOUNT = 1000;
private QueueCollapser Collapser;
private ManualResetEventSlim waitForCollapsing = new ManualResetEventSlim(true);
private ManualResetEventSlim waitForWrite = new ManualResetEventSlim();
// Thread signal.
public AutoResetEvent unProcessedEvent = new AutoResetEvent(false);
// exiting
public volatile bool Exiting = false;
private Task task;
public BasePusher()
{
// initiate Collapser
Collapser = new QueueCollapser();
// set up thread
task = new Task(
() =>
{
consumerTask();
}, TaskCreationOptions.LongRunning
);
}
public void Start()
{
task.Start();
}
private void consumerTask()
{
Data data = null;
while (!Exiting)
{
try
{
// do we try to collapse
if (unProcessed.Count > MIN_COLLAPSECOUNT && (DateTime.Now - Collapser.LastCollapse).TotalSeconds > MIN_COLLAPSETIME)
{
waitForCollapsing.Reset();
waitForWrite.Wait();
unProcessed = Collapser.Collapse(unProcessed);
waitForCollapsing.Set();
// tried this aswell instead of using my own locking, this is like Monitor.Enter
lock(this) {
unProcessed = Collapser.Collapse(unProcessed);
}
}
if (sum == 0)
{
// we wake the thread after 20 seconds, if nothing is in queue it will just go back and wait
unProcessedEvent.WaitOne(20000);
}
var gotOne = unProcessed.TryDequeue(out data);
if (gotOne)
{
ProcessTime(data);
}
}
}
catch (Exception e)
{
}
}
}
protected abstract void ProcessTime(Data d);
public void AddToQueue(Data d)
{
waitForCollapsing.Wait();
waitForWrite.Reset();
unProcessed.Enqueue(d);
waitForWrite.Set();
unProcessedEvent.Set();
}
// tried this aswell instead of using my own locking, this is like Monitor.Enter
public void AddToQueueAlternate(Data d)
{
lock(this) {
unProcessed.Enqueue(d);
waitForWrite.Set();
unProcessedEvent.Set();
}
}
}
}
這可以不鎖定嗎? 我可以使用更輕量級的鎖嗎?截至目前,只有一個線程添加數據和一個線程讀取。如果這能讓我獲得更好的鎖定,我可以保持這種狀態。
您的消費者太慢了。所以通過讓它看起來重複來減慢速度是沒有意義的。讓製片人去做。 –