2013-03-25 82 views
4

我有4個主題。一個是從網絡上讀取一些信息寫入變量,並在每個信號之後發出信號。其中3人正在閱讀這個變量,應該只讀一次。當前的解決方案是編寫者在寫入並等待讀者事件後設置事件。讀者等待事件,然後閱讀並設置他們的事件(意思是他們閱讀)。問題是讀者可以閱讀不止一次,而且我有重複的內容。我如何才能達到讀者準確讀一次的規則?C#One Writer許多讀者只閱讀一次

+0

我會從任何可行的解決方案更喜歡開始,然後做的更好 – syned 2013-03-25 21:13:22

+1

我想讀者應該處理重複寫消息。我認爲你不應該把它留給客戶來假設一次改變事件只會觸發一次。例如,有可能導致此行爲的網絡場景。 – neontapir 2013-03-25 21:15:28

回答

2

一種方法是以下

的數據線程作爲一個單向鏈表之間共享。列表中的每個節點都可以是標記或具有數據。該列表以作爲輸入到標記的單個節點開始。當讀取數據時,會形成一個新的列表,其中包含一系列數據節點,後面跟着一個標記。該列表將追加到添加到列表中的最新標記。

每一個讀者線索都從參考原始標記節點和AutoResetEvent開始。每當有新的數據進入寫入器時,它就會爲每個讀取器線程發出信號AutoResetEvent。讀者線程將會直接走到找到沒有Next節點的標記。

該方案確保所有讀者只能看到一次數據。最大的難題是構建列表,以便可以無鎖地寫入和讀取它。這是非常有Interlocked.CompareExchange雖然

鏈表型直線前進

class Node<T> { 
    public bool IsMarker; 
    public T Data; 
    public Node<T> Next; 
} 

樣品作家型

class Writer<T> { 
    private List<AutoResetEvent> m_list; 
    private Node<T> m_lastMarker; 

    public Writer(List<AutoResetEvent> list, Node<T> marker) { 
    m_lastMarker = marker; 
    m_list = list; 
    } 

    // Assuming this can't overlap. If this can overload then you will 
    // need synchronization in this method around the writing of 
    // m_lastMarker  
    void OnDataRead(T[] items) { 
    if (items.Length == 0) { 
     return; 
    } 

    // Build up a linked list of the new data followed by a 
    // Marker to signify the end of the data. 
    var head = new Node<T>() { Data = items[0] }; 
    var current = head; 
    for (int i = 1; i < items.Length; i++) { 
     current.Next = new Node<T>{ Data = items[i] }; 
     current = current.Next; 
    } 
    var marker = new Node<T> { IsMarker = true }; 
    current.Next = marker; 

    // Append the list to the end of the last marker node the writer 
    // created 
    m_lastMarker.Next = head; 
    m_lastMarker = marker; 

    // Tell each of the readers that there is new data 
    foreach (var e in m_list) { 
     e.Set(); 
    } 
    } 
} 

樣品讀卡類型

class Reader<T> { 
    private AutoResetEvent m_event; 
    private Node<T> m_marker; 

    void Go() { 
    while(true) { 
     m_event.WaitOne(); 
     var current = m_marker.Next; 
     while (current != null) { 
     if (current.IsMarker) { 
      // Found a new marker. Always record the marker because it may 
      // be the last marker in the chain 
      m_marker = current; 
     } else { 
      // Actually process the data 
      ProcessData(current.Data); 
     } 
     current = current.Next; 
     } 
    } 
    } 
} 
+0

這聽起來有點複雜...... – syned 2013-03-25 21:21:17

+1

@syned正確的多線程代碼很少簡單 – JaredPar 2013-03-25 21:23:56

0

我會建議ConcurrentQueue - 它guarnatees每個線程從隊列中獲得一個唯一的實例。 Here是一個很好的解釋如何使用它。

ConnurrentQueue<T>.TryDequeue()是一個線程安全的方法,用於檢查隊列是否爲空,以及是否不從隊列中獲取項目。既然它同時執行兩個操作,程序員不必擔心競態條件。實現這個

+0

因此,作家將新數據添加到隊列中,然後讀者讀取並刪除該元素? – syned 2013-03-25 21:17:50

+0

@syned,正確。 TryDequeue()同時兼有 – alexm 2013-03-25 21:32:29

+0

但是編寫者知道應該將多少元素添加到該隊列中?或者它會自動複製三個元素中的元素? – syned 2013-03-25 21:43:01

0

我想我已經找到了辦法去。我創建了2個AutoResetEvent數組,每個讀取器有2個事件,等待寫入事件和設置讀取事件,並且寫入器設置所有寫入事件並等待所有讀取事件。

JaredPar,你的答案是有用的,可以幫助我

1

我的意見,即表示你應該在代碼的消費者線程接受相同的值多次獲得的可能性同意。也許最簡單的方法是爲每個更新添加一個順序標識符。這樣,線程可以將順序標識與它讀取的最後一個標識進行比較,並知道它是否重複。

它也知道它是否錯過了一個值。

但是,如果你真的需要它們鎖定步驟並且只能獲得一次值,那麼我建議你使用兩個ManualResetEvent對象和一個CountdownEvent。以下是如何使用它們。

ManualResetEvent DataReadyEvent = new ManualResetEvent(); 
ManualResetEvent WaitForResultEvent = new ManualResetEvent(); 
CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads); 

讀者線程等待DataReadyEvent

當其他線程讀取來自網絡的價值,它這樣做:

Acknowledgement.Reset(NumWaitingThreads); 
DataReadyEvent.Set(); // signal waiting threads to process 
Acknowledgement.WaitOne(); // wait for all threads to signal they got it. 
DataReadyEvent.Reset(); // block threads' reading 
WaitForResultEvent.Set(); // tell threads they can continue 

等待的線程做到這一點:

DataReadyEvent.WaitOne(); // wait for value to be available 
// read the value 
Acknowledgement.Set(); // acknowledge receipt 
WaitForResultEvent.WaitOne(); // wait for signal to proceed 

這與具有每等待兩個事件相同的效果線程,但更簡單。

但它的缺點是,如果一個線程崩潰,這將掛起倒計時事件。但是,如果生產者線程等待所有的線程消息,你的方法也會如此。

1

這是一個很適合the Barrier class

您可以使用兩個Barriers在兩個狀態之間進行觸發。

下面是一個例子:

using System; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    internal class Program 
    { 
     private static void Main(string[] args) 
     { 
      int readerCount = 4; 

      Barrier barrier1 = new Barrier(readerCount + 1); 
      Barrier barrier2 = new Barrier(readerCount + 1); 

      for (int i = 0; i < readerCount; ++i) 
      { 
       Task.Factory.StartNew(() => reader(barrier1, barrier2)); 
      } 

      while (true) 
      { 
       barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point. 

       if ((value % 10000) == 0)  // Print message every so often. 
        Console.WriteLine(value); 

       barrier2.SignalAndWait(); // Wait for the reader threads to read the current value. 
       ++value;     // Produce the next value. 
      } 
     } 

     private static void reader(Barrier barrier1, Barrier barrier2) 
     { 
      int expected = 0; 

      while (true) 
      { 
       barrier1.SignalAndWait(); // Wait for "new data available". 

       if (value != expected) 
       { 
        Console.WriteLine("Expected " + expected + ", got " + value); 
       } 

       ++expected; 
       barrier2.SignalAndWait(); // Signal that we've read the data, and wait for all other threads. 
      } 
     } 

     private static volatile int value; 
    } 
} 
+0

它看起來像個好主意,謝謝,我會調查它 – syned 2013-03-25 22:10:46