2010-09-02 31 views
2

我有1個線程流數據和第2個(線程池)處理數據。數據處理需要大約100ms,所以我使用第二個線程,以便不佔用第一個線程。我應該如何在C#中編寫生產者/消費者代碼?

當第二個線程正在處理數據時,第一個線程將數據添加到字典緩存中,然後當第二個線程完成時它處理緩存的值。

我的問題是這應該怎麼做在C#中的生產者/消費者代碼?

public delegate void OnValue(ulong value); 

public class Runner 
{ 
    public event OnValue OnValueEvent; 
    private readonly IDictionary<string, ulong> _cache = new Dictionary<string, ulong>(StringComparer.InvariantCultureIgnoreCase); 
    private readonly AutoResetEvent _cachePublisherWaitHandle = new AutoResetEvent(true); 

    public void Start() 
    { 
     for (ulong i = 0; i < 500; i++) 
     { 
      DataStreamHandler(i.ToString(), i); 
     } 
    } 

    private void DataStreamHandler(string id, ulong value) 
    { 
     _cache[id] = value; 

     if (_cachePublisherWaitHandle.WaitOne(1)) 
     { 
      IList<ulong> tempValues = new List<ulong>(_cache.Values); 
      _cache.Clear(); 

      _cachePublisherWaitHandle.Reset(); 

      ThreadPool.UnsafeQueueUserWorkItem(delegate 
      { 
       try 
       { 
        foreach (ulong value1 in tempValues) 
         if (OnValueEvent != null) 
          OnValueEvent(value1); 
       } 
       finally 
       { 
        _cachePublisherWaitHandle.Set(); 
       } 
      }, null); 
     } 
     else 
     { 
      Console.WriteLine(string.Format("Buffered value: {0}.", value)); 
     } 
    } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     Stopwatch sw = Stopwatch.StartNew(); 
     Runner r = new Runner(); 
     r.OnValueEvent += delegate(ulong x) 
           { 
            Console.WriteLine(string.Format("Processed value: {0}.", x)); 
            Thread.Sleep(100); 

            if(x == 499) 
            { 
             sw.Stop(); 
             Console.WriteLine(string.Format("Time: {0}.", sw.ElapsedMilliseconds)); 
            } 
           }; 
     r.Start(); 
     Console.WriteLine("Done"); 
     Console.ReadLine(); 
    } 
} 

回答

2

關於同步生產者和消費者,MSDN上有一個很好的article。在Albahari網站上也有一個很好的例子。

+1

該Albahari網站是正確的。令人遺憾的是,MSDN文章有一個不正確的實現,可能會導致您在多個消費者中生活的情況。 – 2010-09-02 16:40:24

4

設置生產者 - 消費者模式的最佳做法是使用可在.NET 4.0中獲得的BlockingCollection類,或單獨下載Reactive Extensions框架。這個想法是生產者將使用Add方法排隊,並且消費者將使用Take方法出隊,該方法阻止隊列是否爲空。就像SwDevMan81指出,如果你想要去手動路線,Albahari網站對如何使它正常工作有一個很好的寫法。