2012-11-11 33 views
2

我已經從堆棧溢出問題Disruptor.NET example代碼示例,並將其修改爲「測量」時間。完整的清單如下:爲什麼我的破壞者例子很慢?

using System; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using Disruptor; 
using Disruptor.Dsl; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public long Value { get; set; } 

     public ValueEntry() 
     { 
      Console.WriteLine("New ValueEntry created"); 
     } 
    } 

    public class ValueAdditionHandler : IEventHandler<ValueEntry> 
    { 
     public void OnNext(ValueEntry data, long sequence, bool endOfBatch) 
     { 
      Program.sw.Stop(); 
      long microseconds = Program.sw.ElapsedTicks/(Stopwatch.Frequency/(1000L * 1000L)); 
      Console.WriteLine("elapsed microseconds = " + microseconds); 
      Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence); 
     } 
    } 

    class Program 
    { 
     public static Stopwatch sw = Stopwatch.StartNew(); 

     private static readonly Random _random = new Random(); 
     private static readonly int _ringSize = 16; // Must be multiple of 2 

     static void Main(string[] args) 
     { 
      var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default); 

      disruptor.HandleEventsWith(new ValueAdditionHandler()); 

      var ringBuffer = disruptor.Start(); 

      while (true) 
      { 
       var valueToSet = _random.Next(); 
       long sequenceNo = ringBuffer.Next(); 

       ValueEntry entry = ringBuffer[sequenceNo]; 

       entry.Value = valueToSet; 

       sw.Restart(); 
       ringBuffer.Publish(sequenceNo); 

       Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value); 

       Thread.Sleep(1000); 
      } 
     } 
    } 
} 

,輸出是:

New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
New ValueEntry created 
Published entry 0, value 1510145842 
elapsed microseconds = 2205 
Event handled: Value = 1510145842 (processed event 0 
Published entry 1, value 1718075893 
elapsed microseconds = 85 
Event handled: Value = 1718075893 (processed event 1 
Published entry 2, value 1675907645 
elapsed microseconds = 32 
Event handled: Value = 1675907645 (processed event 2 
Published entry 3, value 1563009446 
elapsed microseconds = 75 
Event handled: Value = 1563009446 (processed event 3 
Published entry 4, value 1782914062 
elapsed microseconds = 34 
Event handled: Value = 1782914062 (processed event 4 
Published entry 5, value 1516398244 
elapsed microseconds = 50 
Event handled: Value = 1516398244 (processed event 5 
Published entry 6, value 76829327 
elapsed microseconds = 50 
Event handled: Value = 76829327 (processed event 6 

因此,需要約50微秒從一個線程傳遞到另一個數據。但它並不快! 「當前版本的Disruptor可以以每秒1百萬條消息的速率在線程間執行〜50 ns。」所以我的結果比預期的要慢1000倍。

我的例子有什麼問題,以及如何達到50   ns的速度?

我修改了上面的程序,現在收到1微秒的延遲,這好得多。不過,我仍在等待disruptor模式專家的迴應。我正在尋找一個可以證明我實際上可以通過50   ns數據的例子。

另外我寫使用BlockingCollection相同的測試和在平均接收14微秒,這證明Disruptor更快:

使用BlockingCollection:

average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433 

使用干擾物:

average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065 

BlockingCollection code:

using System; 
using System.Collections.Concurrent; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public int Value { get; set; } 

     public ValueEntry() 
     { 
      // Console.WriteLine("New ValueEntry created"); 
     } 
    } 

    //public class ValueAdditionHandler : IEventHandler<ValueEntry> 
    //{ 
    // public void OnNext(ValueEntry data, long sequence, bool endOfBatch) 
    // { 

    //  long microseconds = Program.sw[data.Value].ElapsedTicks/(Stopwatch.Frequency/(1000L * 1000L)); 
    //  Program.results[data.Value] = microseconds; 
    //  //Console.WriteLine("elapsed microseconds = " + microseconds); 
    //  //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence); 
    // } 
    //} 

    class Program 
    { 
     public const int length = 10000000; 
     public static Stopwatch[] sw = new Stopwatch[length]; 
     public static long[] results = new long[length]; 

     static BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(150); 

     static void Main(string[] args) 
     { 
      for (int i = 0; i < length; i++) 
      { 
       sw[i] = Stopwatch.StartNew(); 
      } 

      // A simple blocking consumer with no cancellation. 
      Task.Factory.StartNew(() => 
      { 
       while (!dataItems.IsCompleted) 
       { 

        ValueEntry ve = null; 
        try 
        { 
         ve = dataItems.Take(); 
         long microseconds = sw[ve.Value].ElapsedTicks/(Stopwatch.Frequency/(1000L * 1000L)); 
         results[ve.Value] = microseconds; 

         //Console.WriteLine("elapsed microseconds = " + microseconds); 
         //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value); 
        } 
        catch (InvalidOperationException) { } 
       } 
      }, TaskCreationOptions.LongRunning); 

      for (int i = 0; i < length; i++) 
      { 
       var valueToSet = i; 

       ValueEntry entry = new ValueEntry(); 
       entry.Value = valueToSet; 

       sw[i].Restart(); 
       dataItems.Add(entry); 

       //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value); 
       //Thread.Sleep(1000); 
      } 

      // Wait until all events are delivered 
      Thread.Sleep(5000); 

      long average = 0; 
      long minimum = 10000000000; 
      int firstFive = 0; 
      int fiveToTen = 0; 
      int tenToThirty = 0; 
      int moreThenThirty = 0; 

      // Do not count first 100 items because they could be extremely slow 
      for (int i = 100; i < length; i++) 
      { 
       average += results[i]; 
       if (results[i] < minimum) 
       { 
        minimum = results[i]; 
       } 
       if (results[i] < 5) 
       { 
        firstFive++; 
       } 
       else if (results[i] < 10) 
       { 
        fiveToTen++; 
       } 
       else if (results[i] < 30) 
       { 
        tenToThirty++; 
       } else 
       { 
        moreThenThirty++; 
       } 
      } 
      average /= (length - 100); 
      Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty); 
     } 
    } 
} 

干擾器代碼:

using System; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using Disruptor; 
using Disruptor.Dsl; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public int Value { get; set; } 

     public ValueEntry() 
     { 
      // Console.WriteLine("New ValueEntry created"); 
     } 
    } 

    public class ValueAdditionHandler : IEventHandler<ValueEntry> 
    { 
     public void OnNext(ValueEntry data, long sequence, bool endOfBatch) 
     { 

      long microseconds = Program.sw[data.Value].ElapsedTicks/(Stopwatch.Frequency/(1000L * 1000L)); 
      Program.results[data.Value] = microseconds; 
      //Console.WriteLine("elapsed microseconds = " + microseconds); 
      //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence); 
     } 
    } 

    class Program 
    { 
     public const int length = 10000000; 
     public static Stopwatch[] sw = new Stopwatch[length]; 
     public static long[] results = new long[length]; 

     private static readonly Random _random = new Random(); 
     private static readonly int _ringSize = 1024; // Must be multiple of 2 

     static void Main(string[] args) 
     { 
      for (int i = 0; i < length; i++) 
      { 
       sw[i] = Stopwatch.StartNew(); 
      } 

      var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default); 

      disruptor.HandleEventsWith(new ValueAdditionHandler()); 

      var ringBuffer = disruptor.Start(); 

      for (int i = 0; i < length; i++) 
      { 
       var valueToSet = i; 
       long sequenceNo = ringBuffer.Next(); 

       ValueEntry entry = ringBuffer[sequenceNo]; 

       entry.Value = valueToSet; 

       sw[i].Restart(); 
       ringBuffer.Publish(sequenceNo); 

       //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value); 

       //Thread.Sleep(1000); 
      } 

      // wait until all events are delivered 
      Thread.Sleep(5000); 

      long average = 0; 
      long minimum = 10000000000; 
      int firstFive = 0; 
      int fiveToTen = 0; 
      int tenToThirty = 0; 
      int moreThenThirty = 0; 

      // Do not count first 100 items because they could be extremely slow 
      for (int i = 100; i < length; i++) 
      { 
       average += results[i]; 
       if (results[i] < minimum) 
       { 
        minimum = results[i]; 
       } 
       if (results[i] < 5) 
       { 
        firstFive++; 
       } 
       else if (results[i] < 10) 
       { 
        fiveToTen++; 
       } 
       else if (results[i] < 30) 
       { 
        tenToThirty++; 
       } 
       else 
       { 
        moreThenThirty++; 
       } 
      } 
      average /= (length - 100); 
      Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty); 
     } 
    } 
} 
+4

您無法計算單個項目的流逝時間,秒錶(或任何定時器機制)沒有足夠的精度。您應該數數以百萬計的項目才能獲得統計正確的測量結果。 – Euphoric

+0

@Euphoric秒錶有足夠的精度來計算微秒 – javapowered

+3

沒有。此外,運行時環境,GC,打印到控制檯的更改可能會輕鬆中斷此類測試。在嘗試查找代碼錯誤之前,先簡單地修復您的測試。 – Euphoric

回答

1

在這裏,我定你的代碼:

using System; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using Disruptor; 
using Disruptor.Dsl; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public int Value { get; set; } 

     public ValueEntry() 
     { 
     // Console.WriteLine("New ValueEntry created"); 
     } 
    } 

    class Program 
    { 
     public const int length = 1000000; 
     public static Stopwatch sw; 

     private static readonly Random _random = new Random(); 
     private static readonly int _ringSize = 1024; // Must be multiple of 2 

     static void Main(string[] args) 
     { 
      sw = Stopwatch.StartNew(); 

      var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default); 

      var ringBuffer = disruptor.Start(); 

      for (int i = 0; i < length; i++) 
      { 
       var valueToSet = i; 
       long sequenceNo = ringBuffer.Next(); 

       ValueEntry entry = ringBuffer[sequenceNo]; 

       entry.Value = valueToSet; 

       ringBuffer.Publish(sequenceNo); 

       //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value); 

       //Thread.Sleep(1000); 
      } 

      var elapsed = sw.Elapsed.Miliseconds(); 
      // wait until all events are delivered 
      Thread.Sleep(10000); 

      double average = /(double)length; 
      Console.WriteLine("average = " + average); 
     } 
    } 
} 

這應該正確測試多長時間需要爲每個項目。

+1

這可能是錯誤的。您測試發佈每個項目需要多長時間。我需要測試發佈項目並在另一個線程中接收它需要多長時間。我認爲'ringBuffer.Publish(sequenceNo);'調用是異步的,這就是爲什麼你的代碼只測量一半的工作。可能會有很多「訂戶」,我認爲發佈商不會等到所有訂戶處理數據。 – javapowered