2014-01-26 23 views
0

我知道我的問題違背了Disruptor API的基本聲明。但是當我正在學習它時,我寫了一個程序來替換我使用ArrayLinkedBlockingQueue的我的1P-1C用例。但是,當我運行這個程序時,我總是花費更多的時間來處理干擾,而不是ArrayLinkedBlockingQueue。我一定是做錯了什麼或者錯了,但我不確定我的計劃是什麼。有人有意見嗎?我沒有看到Disruptor的性能提升

(這是一個測試程序,所以很明顯我的事件處理程序沒有做任何事情)

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.lmax.disruptor.BusySpinWaitStrategy; 
import com.lmax.disruptor.EventFactory; 
import com.lmax.disruptor.EventHandler; 
import com.lmax.disruptor.EventTranslator; 
import com.lmax.disruptor.RingBuffer; 
import com.lmax.disruptor.dsl.Disruptor; 
import com.lmax.disruptor.dsl.ProducerType; 

public class SPSCDisruptorTest { 
    private static final int UNIT_SIZE = 1024; 
    private static final int BUFFER_SIZE = UNIT_SIZE * 1024 * 16; 
    private static final int ITERATIONS = BUFFER_SIZE; 
    private static final Logger logger = LoggerFactory 
      .getLogger(SPSCDisruptorTest.class); 

    private static class Data { 
     private String data; 

     public String getData() { 
      return data; 
     } 

     public void setData(String data) { 
      this.data = data; 
     } 

     @Override 
     public String toString() { 
      return "Data [data=" + data + "]"; 
     } 

     public final static EventFactory<Data> DATA_FACTORY = new EventFactory<Data>() { 

      @Override 
      public Data newInstance() { 
       return new Data(); 
      } 

     }; 
    } 

    private static class DataEventTranslator implements EventTranslator<Data> { 
     private String payload; 

     public DataEventTranslator(String payload) { 
      this.payload = payload; 
     } 

     @Override 
     public void translateTo(Data d, long sequence) { 
      d.setData(payload); 
     } 

    }; 

    public static void main(String[] args) throws InterruptedException { 
     new SPSCDisruptorTest().testDisruptor(); 
     new SPSCDisruptorTest().testExecutor(); 
    } 

    @SuppressWarnings("unchecked") 
    public void testDisruptor() { 
     ExecutorService exec = Executors.newSingleThreadExecutor(); 
     Disruptor<Data> disruptor = new Disruptor<Data>(
       SPSCDisruptorTest.Data.DATA_FACTORY, BUFFER_SIZE, exec, 
       ProducerType.SINGLE, new BusySpinWaitStrategy()); 
     disruptor.handleEventsWith(new EventHandler<Data>() { 

      @Override 
      public void onEvent(Data data, long sequence, boolean endOfBatch) 
        throws Exception { 
      } 

     }); 
     long t1 = System.nanoTime(); 
     RingBuffer<Data> buffer = disruptor.start(); 
     for (int i = 1; i <= ITERATIONS; i++) { 
      buffer.publishEvent(new DataEventTranslator("data" + i)); 
     } 
     logger.info("waiting for shutdown"); 
     disruptor.shutdown(); 
     logger.info("Disruptor Time (ms): " + (System.nanoTime() - t1 * 1.0) 
       /1000); 
     logger.info("Disruptor is shutdown"); 
     exec.shutdown(); 
    } 

    public void testExecutor() throws InterruptedException { 
     ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, 
       TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(
         BUFFER_SIZE)); 
     long t1 = System.nanoTime(); 
     for (int i = 1; i <= ITERATIONS; i++) { 
      executor.submit(new DataRunner("data" + i)); 
     } 
     executor.shutdown(); 
     executor.awaitTermination(5000, TimeUnit.SECONDS); 
     logger.info("Executor Time (ms): " + (System.nanoTime() - t1 * 1.0) 
       /1000); 
    } 

    private static class DataRunner implements Runnable { 
     private String data; 

     public DataRunner(String data) { 
      this.data = data; 
     } 

     @Override 
     public void run() { 
     } 

    } 
} 
+0

我已經在我的機器上試過了你的代碼我每次都會得到更好的結果。我使用了較小的緩衝區大小,雖然 – Edge

回答

0

你實際測量是錯誤的。你應該開始你的測量後,你已經開始了干擾器,因爲它需要時間暖身(分配環形緩衝區)。由於你的緩衝區大小很大,所以在熱身時需要相當長的時間。嘗試下面的示例代碼。它應該給你更好的時間。

RingBuffer<Data> buffer = disruptor.start(); 
    long t1 = System.nanoTime(); 
    for (int i = 1; i <= ITERATIONS; i++) { 
     buffer.publishEvent(new DataEventTranslator("data" + i)); 
    } 
    logger.info("waiting for shutdown"); 
    disruptor.shutdown(); 
    logger.info("Disruptor Time (ms): " + (System.nanoTime() - t1 * 1.0) 
      /1000); 
+0

不幸的是我沒有看到太多的區別。 與舊的方法干擾器採取:2.93E7毫秒 通過跟蹤時間後disruptor.start(),我得到2.6E7毫秒 所以,沒有太大的區別... – endless