1

Disruptor Getting Started Guide之後,我已經與一個生產者和一個消費者建立了一個最小的破壞者。爲什麼使用較小的環形緩衝區會干擾較慢?

生產者

import com.lmax.disruptor.RingBuffer; 

public class LongEventProducer 
{ 
    private final RingBuffer<LongEvent> ringBuffer; 

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) 
    { 
     this.ringBuffer = ringBuffer; 
    } 

    public void onData() 
    { 
     long sequence = ringBuffer.next(); 
     try 
     { 
      LongEvent event = ringBuffer.get(sequence); 
     } 
     finally 
     { 
      ringBuffer.publish(sequence); 
     } 
    } 
} 

消費者(請注意消費者什麼都不做onEvent

import com.lmax.disruptor.EventHandler; 

public class LongEventHandler implements EventHandler<LongEvent> 
{ 
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) 
    {} 
} 

我的目標是性能測試繞了一大圈緩衝去一次與穿越小環多次。在每種情況下,總操作數(bufferSize X rotations)是相同的。我發現,隨着環形緩衝區變小,操作/秒速率急劇下降。

RingBuffer Size | Revolutions | Total Ops | Mops/sec 

    1048576  |  1  | 1048576 |  50-60 

     1024  |  1024  | 1048576 |  8-16 

     64  |  16384 | 1048576 | 0.5-0.7 

     8  |  131072 | 1048576 | 0.12-0.14 

問:什麼是業績的大規模下降的原因時,環緩衝區大小減少,但總的迭代次數是固定的?這種趨勢與WaitStrategySingle vs MultiProducer無關 - 吞吐量降低,但趨勢相同。

主要(注意SingleProducerBusySpinWaitStrategy

import com.lmax.disruptor.BusySpinWaitStrategy; 
import com.lmax.disruptor.dsl.Disruptor; 
import com.lmax.disruptor.RingBuffer; 
import com.lmax.disruptor.dsl.ProducerType; 

import java.util.concurrent.Executor; 
import java.util.concurrent.Executors; 

public class LongEventMainJava{ 
     static double ONEMILLION = 1000000.0; 
     static double ONEBILLION = 1000000000.0; 

    public static void main(String[] args) throws Exception { 
      // Executor that will be used to construct new threads for consumers 
      Executor executor = Executors.newCachedThreadPool();  

      // TUNABLE PARAMS 
      int ringBufferSize = 1048576; // 1024, 64, 8 
      int rotations = 1; // 1024, 16384, 131702 

      // Construct the Disruptor 
      Disruptor disruptor = new Disruptor<>(new LongEventFactory(), ringBufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); 

      // Connect the handler 
      disruptor.handleEventsWith(new LongEventHandler()); 

      // Start the Disruptor, starts all threads running 
      disruptor.start(); 

      // Get the ring buffer from the Disruptor to be used for publishing. 
      RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
      LongEventProducer producer = new LongEventProducer(ringBuffer); 

      long start = System.nanoTime(); 
      long totalIterations = rotations * ringBufferSize; 
      for (long i = 0; i < totalIterations; i++) { 
       producer.onData(); 
      } 
      double duration = (System.nanoTime()-start)/ONEBILLION; 
      System.out.println(String.format("Buffersize: %s, rotations: %s, total iterations = %s, duration: %.2f seconds, rate: %.2f Mops/s", 
        ringBufferSize, rotations, totalIterations, duration, totalIterations/(ONEMILLION * duration))); 
     } 
} 

並運行,你需要瑣碎代碼

import com.lmax.disruptor.EventFactory; 

public class LongEventFactory implements EventFactory<LongEvent> 
{ 
    public LongEvent newInstance() 
    { 
     return new LongEvent(); 
    } 
} 

運行在覈心i5-2400,12GB RAM ,windows 7

樣本輸出

Buffersize: 1048576, rotations: 1, total iterations = 1048576, duration: 0.02 seconds, rate: 59.03 Mops/s 

Buffersize: 64, rotations: 16384, total iterations = 1048576, duration: 2.01 seconds, rate: 0.52 Mops/s 

回答

2

當製片人(S)填補了環形緩衝區,它必須等待,直到事件能夠繼續之前消耗。

當您的緩衝區大小與您要放入的元素數量大小相同時,製作人員無需等待。它永遠不會溢出。它所做的只是增加計數,索引,並將數據發佈到該索引處的環形緩衝區中。

當你的緩衝區較小時,它仍然只是遞增計數和發佈,但它比消費者能夠消耗的速度快。因此生產者必須等到元素被消耗並且環形緩衝區上的空間被釋放。

+0

謝謝。那麼爲什麼在這個例子中是我的消費者,它實際上什麼都不做,只能訪問與生產者相比比較慢的基礎'LongEvent'?我曾假設製片人會是限制因素。 –

+0

@AdamHughes你的部分不做任何事情,但'Disrupto'r基礎設施在調用'onEvent'方法之前做了一些工作。恰恰相反,這比你的製作人更多的工作。 –

0

好像問題就在於這個代碼塊中lmax\disruptor\SingleProducerSequencer

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) 
     { 
      cursor.setVolatile(nextValue); // StoreLoad fence 

      long minSequence; 
      while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) 
      { 
       waitStrategy.signalAllWhenBlocking(); 
       LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 
      } 

      this.cachedValue = minSequence; 
     } 

特別調用LockSupport.parkNanos(1L)。這可能需要最多15ms on Windows。當生產者到達緩衝區的末尾並等待消費者時,這會被調用。其次,當緩衝區很小時,RingBuffer的錯誤共享可能會發生。我猜測這兩種效應都在起作用。

最後,在基準測試之前,我可以使用JIT加速代碼,其中有一百萬次調用onData()。這得到了最好的情況下,> 80Mops/sec,但沒有消除緩衝收縮的退化。