2012-11-12 61 views
6

在這個例子中https://stackoverflow.com/a/9980346/93647和這裏Why is my disruptor example so slow?干擾物的例子(在問題的末端)有1個出版商其中發佈項目和1名消費者。1個出版商和4名平行消費者

但在我的情況下,消費者的工作更加複雜,需要一定的時間。所以我想要4個並行處理數據的消費者。

因此,舉例來說,如果生產商生產的數字:1,2,3,4,5,6,7,8,9,10,11 ..

我想consumer1趕上1,5,9, ...消費者2捕捉2,6,10,...消費者3捕捉3,7,11,...消費者4捕捉4,8,12 ...(不完全是這些數字,這個想法是數據應該並行處理,我不關心哪個消費者處理哪個數字)

並且記住這需要並行完成,因爲在實際應用中消費者的工作非常昂貴。我希望消費者可以通過不同的線程來執行多核系統的功能。

我當然可以只創建4個ringbuffers並附加1名消費者1環形緩衝器。這樣我可以使用原始示例。但我覺得這是不正確的。有可能創建1個發佈者(1個ringbuffer)和4個消費者是正確的 - 因爲這是我需要的。

添加鏈接到谷歌羣體非常simular問題:https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

所以我們有兩個選擇:在每個另外

  • 一個圈許多消費者(每個消費者將「喚醒」,所有消費者應該擁有相同的WaitStrategy)每個消費者只需要處理它的數據就可以喚醒,每個消費者可以擁有自己的WaitStrategy)。

回答

1

編輯:我忘了提及代碼部分取自the FAQ。我不知道這種方法比弗蘭克的建議好還是差。

該項目嚴重記錄下來,這是一個恥辱,因爲它看起來不錯。
反正嘗試以下剪斷(根據您的第一個鏈接) - 在單測試,似乎是OK:

using System; 
using System.Threading.Tasks; 
using Disruptor; 
using Disruptor.Dsl; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public long Value { get; set; } 
    } 

    public class MyHandler : IEventHandler<ValueEntry> 
    { 
     private static int _consumers = 0; 
     private readonly int _ordinal; 

     public MyHandler() 
     { 
      this._ordinal = _consumers++; 
     } 

     public void OnNext(ValueEntry data, long sequence, bool endOfBatch) 
     { 
      if ((sequence % _consumers) == _ordinal) 
       Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal); 
      else 
       Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);      
     } 
    } 

    class Program 
    { 
     private static readonly Random _random = new Random(); 
     private const int SIZE = 16; // Must be multiple of 2 
     private const int WORKERS = 4; 

     static void Main() 
     { 
      var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default); 
      for (int i=0; i < WORKERS; i++) 
       disruptor.HandleEventsWith(new MyHandler()); 
      var ringBuffer = disruptor.Start(); 

      while (true) 
      { 
       long sequenceNo = ringBuffer.Next(); 
       ringBuffer[sequenceNo].Value = _random.Next();; 
       ringBuffer.Publish(sequenceNo); 
       Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value); 
       Console.ReadKey(); 
      } 
     } 
    } 
} 
+0

謝謝!會嘗試! – javapowered

+0

你知道如果調用ringbuffer'Next()''Publish'方法是線程安全的嗎?我可以平行稱呼他們嗎?我可以從兩個不同的線程調用ringbuffer'Next'方法嗎? – javapowered

+0

也是如何幹擾使內部?創建了多少個線程?干擾者爲每個消費者創建單獨的線程?或使用某種線程池? – javapowered

0

從環形緩衝區的規格中,您會看到每個消費者都會嘗試處理您的ValueEvent。在你的情況下,你不需要那樣。

我解決了它這樣的:

添加處理您ValueEvent,當消費者需要他在那場測試的情況下,如果是已經處理,他移動到下一個字段的字段。

不是最漂亮的方式,但它是如何工作的緩衝區。

+0

你需要了解它的場同步?你聲明它是volatile還是使用'Interlock'類來更新'bool'字段?另外如何將多個消費者連接到環形緩衝區?我只能將一個消費者傳遞給'HandleEventsWith'方法。到目前爲止,聽起來可以更容易地創建4個環形緩衝區,並使用下一個環形緩衝區循環發佈,以便每次發佈:) – javapowered

+0

如果創建4個環形緩衝區,將會失去環的「負載平衡」功能緩衝區,我想這就是你使用它的原因。在你的其他Q上,我使用的是JAVA緩衝區,所以我不能顯示你的代碼,但只是按照例子,他們很清楚。 – Frank

+0

在這種特殊情況下,我不需要「負載平衡」,因爲我的任務幾乎是「平等的」,只是將它們在4之間分開即可。不過這是一個有趣的功能。我應該遵循Java的例子嗎?因爲我幾乎找不到任何C#示例。 – javapowered

相關問題