2016-03-06 29 views
1

的C#實現我有三個問題:做一個LinkedRingBuffer線程安全

  1. 你通常覺得我的方法來解決給定的問題是什麼?
  2. 您認爲我可以進一步提高性能?
  3. 最重要的一點:我如何讓我的實現真的線程安全?

首先簡化的場景我在: 我通過消息系統與不同設備進行通信。我在相當短的時間內接收併發送數千和數千條消息。我處於多線程環境中,因此許多不同的任務正在發送和期待消息。對於消息接收來說,事件驅動方法讓我們在使線程安全的意義上遇到了很多麻煩。 我有幾個接收器任務從外部獲取消息,並且必須將這些消息傳遞給許多消費者任務。

所以我想出了一個不同的方法: 爲什麼沒有幾千條消息的歷史記錄,每條新消息都排隊,消費者任務可以從最新的項目向後搜索到最後處理的項目以獲得所有新到的消息。當然這必須是快速和線程安全的。

我想出了一個連接環形緩衝區的想法,並實施了以下內容:

public class LinkedRingBuffer<T> 
    { 
    private LinkedRingBufferNode<T> firstNode; 
    private LinkedRingBufferNode<T> lastNode; 

    public LinkedRingBuffer(int capacity) 
    { 
     Capacity = capacity; 
     Count = 0; 
    } 

    /// <summary> 
    /// Maximum count of items inside the buffer 
    /// </summary> 
    public int Capacity { get; } 

    /// <summary> 
    /// Actual count of items inside the buffer 
    /// </summary> 
    public int Count { get; private set; } 

    /// <summary> 
    /// Get value of the oldest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetFirst() 
    { 
     return firstNode.Item; 
    } 

    /// <summary> 
    /// Get value of the newest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetLast() 
    { 
     return lastNode.Item; 
    } 

    /// <summary> 
    /// Add item at the end of the buffer. 
    /// If capacity is reached the link to the oldest item is deleted. 
    /// </summary> 
    public void Add(T item) 
    { 
     /* create node and set to last one */ 
     var node = new LinkedRingBufferNode<T>(lastNode, item); 
     lastNode = node; 
     /* if it is the first node, the created is also the first */ 
     if (firstNode == null) 
      firstNode = node; 
     /* check for capacity reach */ 
     Count++; 
     if(Count > Capacity) 
     {/* deleted all links to the current first so that its eventually gc collected */ 
      Count = Capacity; 
      firstNode = firstNode.NextNode; 
      firstNode.PreviousNode = null; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to the newest item 
    /// </summary> 
    public IEnumerable<T> LastToFirst() 
    { 
     var current = lastNode; 
     while(current != null) 
     { 
      yield return current.Item; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to the oldest item 
    /// </summary> 
    public IEnumerable<T> FirstToLast() 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      current = current.NextNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to given item. 
    /// If item doesn't exist it iterates until it reaches the newest 
    /// </summary> 
    public IEnumerable<T> LastToReference(T item) 
    { 
     var current = lastNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to given item. 
    /// If item doesn't exist it iterates until it reaches the oldest 
    /// </summary> 
    public IEnumerable<T> FirstToReference(T item) 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Represents a linked node inside the buffer and holds the data 
    /// </summary> 
    private class LinkedRingBufferNode<A> 
    { 
     public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item) 
     { 
      Item = item; 
      NextNode = null; 
      PreviousNode = previousNode; 
      if(previousNode != null) 
       previousNode.NextNode = this; 
     } 
     internal A Item { get; } 
     internal LinkedRingBufferNode<A> PreviousNode { get; set; } 
     internal LinkedRingBufferNode<A> NextNode { get; private set; } 
    } 
    } 

但遺憾的是我一種新的多線程環境,所以我怎麼會做這個緩衝區線程安全的多個讀取和寫入?

謝謝!

回答

1

我認爲最簡單的方法就是在每次執行線程關鍵代碼時都有一個同步object,您將lock打開。 lock塊中的代碼被稱爲critical section,並且一次只能由一個線程訪問。任何其他希望訪問它的線程都將等待,直到鎖被釋放。

定義和初始化:

private object Synchro; 

public LinkedRingBuffer(int capacity) 
{ 
    Synchro = new object(); 
    // Other constructor code 
} 

用法:

public T GetFirst() 
{ 
    lock(Synchro) 
    { 
     return firstNode.Item; 
    } 
} 

當編寫線程安全的代碼,lock荷蘭國際集團某些部分似乎是顯而易見的。但是,如果你不知道是否要lock聲明或代碼塊,用於讀取和寫入的安全,你需要考慮:此代碼

  • 是否能影響其他的行爲或結果鎖定關鍵部分。
  • 是否有其他鎖定關鍵部分可以影響此代碼的行爲或結果。

您還需要重寫某些自動實現的屬性以具有後臺字段。它應該是非常簡單的,但是...

你的yield return使用,同時在單一線程上下文相當聰明,高效的,會導致在多線程方面的麻煩。這是因爲yield return doesn't release a lock statement(它不應該)。您將不得不在包裝中使用yield return進行實現。

你的線程安全的代碼看起來是這樣的:

public class LinkedRingBuffer<T> 
{ 
    private LinkedRingBufferNode<T> firstNode; 
    private LinkedRingBufferNode<T> lastNode; 
    private object Synchro; 

    public LinkedRingBuffer(int capacity) 
    { 
     Synchro = new object(); 
     Capacity = capacity; 
     Count = 0; 
    } 

    /// <summary> 
    /// Maximum count of items inside the buffer 
    /// </summary> 
    public int Capacity { get; } 

    /// <summary> 
    /// Actual count of items inside the buffer 
    /// </summary> 
    public int Count 
    { 
     get 
     { 
      lock (Synchro) 
      { 
       return _count; 
      } 
     } 
     private set 
     { 
      _count = value; 
     } 
    } 
    private int _count; 

    /// <summary> 
    /// Get value of the oldest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetFirst() 
    { 
     lock (Synchro) 
     { 
      return firstNode.Item; 
     } 
    } 

    /// <summary> 
    /// Get value of the newest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetLast() 
    { 
     lock (Synchro) 
     { 
      return lastNode.Item; 
     } 
    } 

    /// <summary> 
    /// Add item at the end of the buffer. 
    /// If capacity is reached the link to the oldest item is deleted. 
    /// </summary> 
    public void Add(T item) 
    { 
     lock (Synchro) 
     { 
      /* create node and set to last one */ 
      var node = new LinkedRingBufferNode<T>(lastNode, item); 
      lastNode = node; 
      /* if it is the first node, the created is also the first */ 
      if (firstNode == null) 
       firstNode = node; 
      /* check for capacity reach */ 
      Count++; 
      if (Count > Capacity) 
      { 
       /* deleted all links to the current first so that its eventually gc collected */ 
       Count = Capacity; 
       firstNode = firstNode.NextNode; 
       firstNode.PreviousNode = null; 
      } 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to the newest item 
    /// </summary> 
    public IEnumerable<T> LastToFirst() 
    { 
     lock (Synchro) 
     { 
      var materialized = LastToFirstInner().ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> LastToFirstInner() 
    { 
     var current = lastNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to the oldest item 
    /// </summary> 
    public IEnumerable<T> FirstToLast() 
    { 
     lock (Synchro) 
     { 
      var materialized = FirstToLastInner().ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> FirstToLastInner() 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      current = current.NextNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to given item. 
    /// If item doesn't exist it iterates until it reaches the newest 
    /// </summary> 
    public IEnumerable<T> LastToReference(T item) 
    { 
     lock (Synchro) 
     { 
      var materialized = LastToReferenceInner(item).ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> LastToReferenceInner(T item) 
    { 
     var current = lastNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to given item. 
    /// If item doesn't exist it iterates until it reaches the oldest 
    /// </summary> 
    public IEnumerable<T> FirstToReference(T item) 
    { 
     lock (Synchro) 
     { 
      var materialized = FirstToReferenceInner(item).ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> FirstToReferenceInner(T item) 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Represents a linked node inside the buffer and holds the data 
    /// </summary> 
    private class LinkedRingBufferNode<A> 
    { 
     public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item) 
     { 
      Item = item; 
      NextNode = null; 
      PreviousNode = previousNode; 
      if (previousNode != null) 
       previousNode.NextNode = this; 
     } 
     internal A Item { get; } 
     internal LinkedRingBufferNode<A> PreviousNode { get; set; } 
     internal LinkedRingBufferNode<A> NextNode { get; private set; } 
    } 
} 

可以有一些優化完成,例如你不需要創建臨界區裏面的LinkedRingBufferNode對象,但是你必須複製在創建對象之前將lastNode值賦給臨界區域內的局部變量。

+0

感謝您的快速回復!所以當迭代器的一個使用ToList通話將創造一個全新的列表這種方法(同步),它可以在foreach迭代之後,是正確的?或者是鎖定整個foreach迭代過程?我還擔心ToList()調用會對具有幾千個項目的緩衝區產生巨大的性能影響,但我會在明天運行一個基準測試。也許只是忘記了foreach,並用鎖定的GetPrevious()/ GetNext()調用手動搜索列表會更快。 – sirloin