2013-03-18 18 views
2

我正在從歷史數據庫接收大約50 Hz的數據。我想把它流出10 Hz左右的另一端。 爲了實現這一點,我產生了兩個定時器,一個用於從歷史數據庫中獲取數據(我運行這個速度比發送定時器快兩倍)。第二個定時器是200ms(10Hz)。 第一個計時器存儲它在BlockingCollection中獲得的每個值(我也試過ConcurrentQueue)。由於兩個不同的線程正在讀取/寫入集合,因此我無法使用正常的列表/隊列。 與此相關的挑戰是,它甚至不會給我一個50Hz。我預計每次調用都會得到5個值,但它會以突發方式給我提供數據(即有時沒有,有時15個)。因此,我將BlockingCollection作爲發送計時器從中獲取數據的緩衝區。 發送計時器使用遞歸方法獲取列表中間的數據。它檢查是否發送數值,如果已經是(50Hz - > 10Hz),則獲取下一個第5個數值,如果沒有發送數值。 現在解決問題;遞歸方法有時會兩次發送相同的值,即使我鎖定了對象。我們已經指出這個問題是鎖定不能按預期工作,但我們不知道爲什麼,或者如何解決這個問題。避免從BlockingCollection獲取重複值,並使用單獨的線程

有什麼建議嗎? 附加的代碼不是真實的代碼,但它說明了問題,並且實際上給你重複的值,雖然不是很經常。

class Program { 

    private readonly ConcurrentDictionary<int, BlockingCollection<TestObject>> _pipes = new ConcurrentDictionary<int, BlockingCollection<TestObject>>(); 
    private const int Interval = 5; 

    private Timer inTimer; 
    private Timer outTimer; 

    static void Main() { 
     Program program = new Program(); 
     program.Run(); 
     Console.ReadLine(); 
    } 

    private void Run() { 
     _pipes[100] = new BlockingCollection<TestObject>(); 
     _pipes[200] = new BlockingCollection<TestObject>(); 
     _pipes[300] = new BlockingCollection<TestObject>(); 

     inTimer = new Timer(InTimer, null, 0, 100); 
     Thread.Sleep(1000); 
     outTimer = new Timer(OutTimer, null, 0, 200); 
    } 

    private void OutTimer(object state) { 
     foreach (TestObject testObject in GetNotSentTestObjects()) { 
      Console.WriteLine("{0};{1};{2}", testObject.PipeId, testObject.Timestamp.ToString("o"), testObject.Value); 
     } 
    } 

    private IEnumerable<TestObject> GetNotSentTestObjects() { 
     List<TestObject> testObjects = new List<TestObject>(); 

     foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) { 
      TestObject testObject = GetLatestTestObjectNotSent(pipe.Key, (pipe.Value.Count/2)); 
      if (testObject == null) { 
       return null; 
      } 
      testObjects.Add(testObject); 
     } 
     return testObjects; 
    } 

    private TestObject GetLatestTestObjectNotSent(int key, int locationInList) { 

     BlockingCollection<TestObject> pipe = _pipes[key]; 
     TestObject testObject; 
     lock (pipe) { 
      testObject = pipe.ElementAt(locationInList - 1); 

      if (testObject.Sent) { 
       int nextLocationInList = locationInList + Interval; 
       GetLatestTestObjectNotSent(key, nextLocationInList); 
      } 
      testObject.Sent = true; 
     } 
     testObject.PipeId = key; 
     return testObject; 
    } 

    private void InTimer(object sender) { 
     Random random = new Random(); 
     for (int i = 0; i < random.Next(0,20); i++) { 
      foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) { 
       pipe.Value.Add(new TestObject()); 
      } 
      i++; 
     } 
    } 
} 

internal class TestObject { 
    public DateTime Timestamp { get; set; } 
    public string Value { get; set; } 
    public bool Sent { get; set; } 
    public int PipeId; 

    public TestObject() { 
     Value = Guid.NewGuid().ToString().Substring(0, 8); 
     Timestamp = DateTime.Now; 
    } 
} 

請注意,Thread.Sleep是讓列表被填充。在我的代碼中,我也有一段時間後修剪列表的代碼。

+0

你確定你不想使用[BlockingCollection.GetConsumingEnumerable(HTTP:// MSDN。 microsoft.com/en-us/library/dd287186.aspx)從它讀取? – 2013-03-18 20:25:30

+0

你應該知道'Thread.Sleep(1000)'讓你的線程至少休眠1秒,但它可能會休眠2或3或OS決定的任何秒數。 – Rafael 2013-03-18 20:38:01

回答

0

頭撲的很多經過事實證明我需要返回時,我用遞歸的方法...

0

同一個線程可以多次獲取相同的鎖。看到這個例子

object o = new object(); 
lock (o) lock (o) Console.WriteLine("Acquired lock two times"); 

您應該使用其他同步原語像互斥,信號量的AutoResetEvent等..

1

一個BlockingCollection是隊列。它不是爲隨機訪問而設計的,試圖隨機訪問它將會導致你無法完成任何事情。你也不會從隊列中移除項目,這意味着最終你會耗盡內存。

您可能遇到的一個問題是,儘管您在鎖定列表時使用ElementAt訪問該列表,但添加項目時並未鎖定它。但是鎖定併發數據結構的整個想法應該讓你重新考慮你的設計。您不應該永遠不必鎖定併發數據結構。

據我所知,你想選擇每一個尚未處理的第五個項目。如果你不想丟棄未處理的項目(即項目1-4),那麼這將是一個相當困難的問題。如果你放棄這些項目,那麼它的調用Take五倍的一個簡單的問題:

TestObject o; 
for (int i = 0; i < 5; ++i) 
{ 
    o = pipe.Take(); 
} 
// You now have the 5th item from the queue 
// The other items have been discarded 

如果你想保持這些項目周圍可能的處理後,你必須將它們添加回隊列莫名其妙。但是他們會到隊列的盡頭,這意味着你會在較新的隊列之前處理舊的隊列。

另外,在某些時候,您將填滿隊列,因爲製作者正在添加的東西比他們正在消耗的更快。

您需要進一步思考如何讓這個應用程序工作,或者更好地向我們解釋它。您所描述的內容和您發佈的代碼都很困惑,並且使我們無法提供良好的建議。