2014-05-17 21 views
0

我有一個ConcurrentQueue從一個線程獲取對象,另一個線程從中獲取對象並處理它們。帶有小開銷或無鎖的鎖

如果隊列變大,我可以通過刪除重複項來「壓縮」它。壓縮採用隊列並將其放入一個列表中,遍歷它並創建一個只有不同值的新隊列。所以我替換隊列,並且因爲我這樣做,我不能將對象插入到被覆蓋的隊列中,我會將它們釋放。

我的問題是,如果我添加一個鎖(OBJ){}或某種LockHandle我鬆散很多的性能。有很多交易,但處理時間非常短,所以鎖定看起來像是什麼在阻止我的表現。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 
using System.IO; 

namespace A 
{ 
    public abstract class Base 
    { 
     private ConcurrentQueue<Data> unProcessed = new ConcurrentQueue<Data>(); 

     private const int MIN_COLLAPSETIME = 30; 
     private const int MIN_COLLAPSECOUNT = 1000; 
     private QueueCollapser Collapser; 
     private ManualResetEventSlim waitForCollapsing = new ManualResetEventSlim(true); 
     private ManualResetEventSlim waitForWrite = new ManualResetEventSlim(); 

     // Thread signal. 
     public AutoResetEvent unProcessedEvent = new AutoResetEvent(false); 
     // exiting 
     public volatile bool Exiting = false; 

     private Task task; 

     public BasePusher() 
     { 
      // initiate Collapser 
      Collapser = new QueueCollapser(); 
      // set up thread 
      task = new Task(
       () => 
       { 
        consumerTask(); 
       }, TaskCreationOptions.LongRunning 
       ); 

     } 

     public void Start() 
     { 
      task.Start(); 
     } 

     private void consumerTask() 
     { 
      Data data = null; 
      while (!Exiting) 
      { 
       try 
       { 
         // do we try to collapse 
         if (unProcessed.Count > MIN_COLLAPSECOUNT && (DateTime.Now - Collapser.LastCollapse).TotalSeconds > MIN_COLLAPSETIME) 
         { 
          waitForCollapsing.Reset(); 
          waitForWrite.Wait(); 
          unProcessed = Collapser.Collapse(unProcessed); 
          waitForCollapsing.Set(); 
          // tried this aswell instead of using my own locking, this is like Monitor.Enter 
          lock(this) { 
           unProcessed = Collapser.Collapse(unProcessed); 
          } 
         } 
         if (sum == 0) 
         { 
          // we wake the thread after 20 seconds, if nothing is in queue it will just go back and wait 
          unProcessedEvent.WaitOne(20000); 
         } 
         var gotOne = unProcessed.TryDequeue(out data); 
         if (gotOne) 
         { 
          ProcessTime(data); 
         } 
        } 
       } 
       catch (Exception e) 
       { 

       } 
      } 
     } 

     protected abstract void ProcessTime(Data d); 

     public void AddToQueue(Data d) 
     { 
      waitForCollapsing.Wait(); 
      waitForWrite.Reset(); 
      unProcessed.Enqueue(d); 
      waitForWrite.Set(); 
      unProcessedEvent.Set(); 
     } 

     // tried this aswell instead of using my own locking, this is like Monitor.Enter 
     public void AddToQueueAlternate(Data d) 
     { 
      lock(this) { 
       unProcessed.Enqueue(d); 
       waitForWrite.Set(); 
       unProcessedEvent.Set(); 
      } 
     } 
    } 
} 

這可以不鎖定嗎? 我可以使用更輕量級的鎖嗎?截至目前,只有一個線程添加數據和一個線程讀取。如果這能讓我獲得更好的鎖定,我可以保持這種狀態。

+0

您的消費者太慢了。所以通過讓它看起來重複來減慢速度是沒有意義的。讓製片人去做。 –

回答

1

爲什麼你會有重複?

如果發佈者真的可以添加重複項,那麼您需要某種類型的併發哈希對象(DictionaryHashSet)來檢測和防止這種情況發生在發佈者中。您可能也想調查ReaderWriterLockSlim

+0

嗯,它不完全重複。這是將寫入數據庫的對象。我刪除的「重複」是具有相同主鍵的那些。我做....在現有的更新,我刪除那些將被覆蓋反正 – klundby

2

如果你想併發和沒有重複,你應該使用ConcurrentDictionary

所以重新聲明你的隊列:

private ConcurrentDictionary<Data, Data> unProcessed = 
    new ConcurrentDictionary<Data, Data>(); 

這將大大簡化你的代碼,同時保持非常良好的性能。

+0

這也讓我保持秩序? – klundby

+0

不,不會,字典沒有排序。 – quantdev