我正在從歷史數據庫接收大約50 Hz的數據。我想把它流出10 Hz左右的另一端。 爲了實現這一點,我產生了兩個定時器,一個用於從歷史數據庫中獲取數據(我運行這個速度比發送定時器快兩倍)。第二個定時器是200ms(10Hz)。 第一個計時器存儲它在BlockingCollection中獲得的每個值(我也試過ConcurrentQueue)。由於兩個不同的線程正在讀取/寫入集合,因此我無法使用正常的列表/隊列。 與此相關的挑戰是,它甚至不會給我一個50Hz。我預計每次調用都會得到5個值,但它會以突發方式給我提供數據(即有時沒有,有時15個)。因此,我將BlockingCollection作爲發送計時器從中獲取數據的緩衝區。 發送計時器使用遞歸方法獲取列表中間的數據。它檢查是否發送數值,如果已經是(50Hz - > 10Hz),則獲取下一個第5個數值,如果沒有發送數值。 現在解決問題;遞歸方法有時會兩次發送相同的值,即使我鎖定了對象。我們已經指出這個問題是鎖定不能按預期工作,但我們不知道爲什麼,或者如何解決這個問題。避免從BlockingCollection獲取重複值,並使用單獨的線程
有什麼建議嗎? 附加的代碼不是真實的代碼,但它說明了問題,並且實際上給你重複的值,雖然不是很經常。
class Program {
private readonly ConcurrentDictionary<int, BlockingCollection<TestObject>> _pipes = new ConcurrentDictionary<int, BlockingCollection<TestObject>>();
private const int Interval = 5;
private Timer inTimer;
private Timer outTimer;
static void Main() {
Program program = new Program();
program.Run();
Console.ReadLine();
}
private void Run() {
_pipes[100] = new BlockingCollection<TestObject>();
_pipes[200] = new BlockingCollection<TestObject>();
_pipes[300] = new BlockingCollection<TestObject>();
inTimer = new Timer(InTimer, null, 0, 100);
Thread.Sleep(1000);
outTimer = new Timer(OutTimer, null, 0, 200);
}
private void OutTimer(object state) {
foreach (TestObject testObject in GetNotSentTestObjects()) {
Console.WriteLine("{0};{1};{2}", testObject.PipeId, testObject.Timestamp.ToString("o"), testObject.Value);
}
}
private IEnumerable<TestObject> GetNotSentTestObjects() {
List<TestObject> testObjects = new List<TestObject>();
foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
TestObject testObject = GetLatestTestObjectNotSent(pipe.Key, (pipe.Value.Count/2));
if (testObject == null) {
return null;
}
testObjects.Add(testObject);
}
return testObjects;
}
private TestObject GetLatestTestObjectNotSent(int key, int locationInList) {
BlockingCollection<TestObject> pipe = _pipes[key];
TestObject testObject;
lock (pipe) {
testObject = pipe.ElementAt(locationInList - 1);
if (testObject.Sent) {
int nextLocationInList = locationInList + Interval;
GetLatestTestObjectNotSent(key, nextLocationInList);
}
testObject.Sent = true;
}
testObject.PipeId = key;
return testObject;
}
private void InTimer(object sender) {
Random random = new Random();
for (int i = 0; i < random.Next(0,20); i++) {
foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
pipe.Value.Add(new TestObject());
}
i++;
}
}
}
internal class TestObject {
public DateTime Timestamp { get; set; }
public string Value { get; set; }
public bool Sent { get; set; }
public int PipeId;
public TestObject() {
Value = Guid.NewGuid().ToString().Substring(0, 8);
Timestamp = DateTime.Now;
}
}
請注意,Thread.Sleep是讓列表被填充。在我的代碼中,我也有一段時間後修剪列表的代碼。
你確定你不想使用[BlockingCollection.GetConsumingEnumerable(HTTP:// MSDN。 microsoft.com/en-us/library/dd287186.aspx)從它讀取? – 2013-03-18 20:25:30
你應該知道'Thread.Sleep(1000)'讓你的線程至少休眠1秒,但它可能會休眠2或3或OS決定的任何秒數。 – Rafael 2013-03-18 20:38:01