2012-10-02 29 views
1

我在想,如果使用ActiveMQ而不是RabbitMQ來實現更快的原始消息傳遞吞吐量(對於發佈和消費)是不尋常的?我在問,因爲我遇到過的每一個其他在線參考都讓RabbitMQ更快。使用ActiveMQ獲得比RabbitMQ更好的性能

我沒有使用合法的基準測試工具進行測試;相反,我修改了兩個基本的發佈者/消費者示例,以使用3千字節的郵件正文測試100,000條消息。請注意,我正在測試跨兩個不同Amazon EC2 x大型實例的發佈和消耗。也許我沒有正確設置我的代碼?請參閱下面的結果和代碼。

ActiveMQ Send 3kb 
Average Time per Message (ns): 497276.1179 
Average # Messages per second: 2010.935101 
Total Time (s):     49.72810906 

ActiveMQ Recv 3kb 
Average Time per Message (ns): 43813.35476 
Average # Messages per second: 22823.86285 
Total Time (s):     4.381379289 

RabbitMQ Send 3kb 
Average Time per Message (ns): 1041524.626 
Average # Messages per second: 960.1309229 
Total Time (s):     104.1524626 

RabbitMQ Recv 3kb 
Average Time per Message (ns): 612559.3732 
Average # Messages per second: 1632.494814 
Total Time (s):     61.25593732 

更新號碼RabbitMQ的Send.java & Recv.java去除queueDeclare()後:

這極大地改善了的RabbitMQ的時間,但一些必須的只有4秒ActiveMQ的消費時間是關閉...

ActiveMQ Send 3kb 
Average Time per Message (ns): 491404.5666 
Average # Messages per second: 2034.983124 
Total Time (s):     49.14045666 

ActiveMQ Recv 3kb 
Average Time per Message (ns): 41976.17158 
Average # Messages per second: 23823.03965 
Total Time (s):     4.197617158 

RabbitMQ Send 3kb 
Average Time per Message (ns): 354795.8818 
Average # Messages per second: 2818.522005 
Total Time (s):     35.47958818 

RabbitMQ Recv 3kb 
Average Time per Message (ns): 440349.3892 
Average # Messages per second: 2270.924009 
Total Time (s):     44.03493892 

ActiveMQ的Send.java

public class Send implements Runnable { 

private final static int NUMBER_OF_MESSAGES = 100000; 
private static long startTime = 0; 
private static long stopTime = 0; 
private static long totalTime = 0; 

public static void main(String[] argv) throws java.io.IOException { 
    (new Thread(new Send())).start(); 
} 

public void run() { 
    try { 
     // Create a ConnectionFactory 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 

     // Create a Connection 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 

     // Create a Session 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

     // Create the destination (Topic or Queue) 
     Destination destination = session.createQueue("TEST.FOO"); 

     // Create a MessageProducer from the Session to the Topic or Queue 
     MessageProducer producer = session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

     for (int i=0; i <= NUMBER_OF_MESSAGES; i++){ 
      startTime = System.nanoTime(); 

      // 3kb 
      String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..." 

      TextMessage message = session.createTextMessage(text); 

// Tell the producer to send the message 
      //System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); 
      producer.send(message); 
      stopTime = System.nanoTime(); 
      totalTime = totalTime + stopTime-startTime; 
      System.out.println(i + "," + Long.toString(stopTime-startTime)); 

     } 

     // Clean up 
     session.close(); 
     connection.close(); 

     //System.out.println(""); 
     //System.out.println("Total Time: " + totalTime + "ns"); 
     //System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns"); 
     //System.out.println(""); 

    } 
    catch (Exception e) { 
     System.out.println("Caught: " + e); 
     e.printStackTrace(); 
    } 
} 
} 

的ActiveMQ Recv.java

public class Recv implements Runnable { 

private static long startTime = 0; 
private static long stopTime = 0; 
private static long totalTime = 0; 
private static int numMessages = 0; 

public static void main(String[] argv) 
    throws java.io.IOException { 

    (new Thread(new Recv())).start(); 

} 

public void run() { 
    try { 

     // Create a ConnectionFactory 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616"); 

     // Create a Connection 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 

     // Create a Session 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

     // Create the destination (Topic or Queue) 
     Destination destination = session.createQueue("TEST.FOO"); 

     // Create a MessageConsumer from the Session to the Topic or Queue 
     MessageConsumer consumer = session.createConsumer(destination); 

     // Message Listener 
     MyListener listener = new MyListener(); 
     consumer.setMessageListener(listener); 

     // Wait for a message 
     //Message message = consumer.receive(1000); 

     // consumer.close(); 
     // session.close(); 

// connection.close(); 
    } catch (Exception e) { 
     System.out.println("Caught: " + e); 
     e.printStackTrace(); 
    } 
} 

public class MyListener implements MessageListener { 
    public void onMessage(Message message) { 
     try { 
      startTime = System.nanoTime(); 
      if (message instanceof TextMessage) { 
       TextMessage textMessage = (TextMessage) message; 
       String text = textMessage.getText(); 
       stopTime = System.nanoTime(); 
       totalTime = totalTime + stopTime-startTime; 

       System.out.println(numMessages + "," + Long.toString(stopTime-startTime)); 

       numMessages++; 

      } else { 
       System.out.println("Received: " + message); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 
} 

的RabbitMQ Send.java

public class Send implements Runnable { 

private final static String QUEUE_NAME = "hello"; 
private final static int NUMBER_OF_MESSAGES = 100000; 
private static long startTime = 0; 
private static long stopTime = 0; 
private static long totalTime = 0; 

// 3kb 
private static final String message = "AMFu8UlKW2zJB..." 

public static void main(String[] argv) 
throws java.io.IOException { 

(new Thread(new Send())).start(); 

} 

public void run() { 

try { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    for (int i=1; i <= NUMBER_OF_MESSAGES; i++){ 
     startTime = System.nanoTime(); 

     // No Persistence 
     // channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 

     stopTime = System.nanoTime(); 
     totalTime = totalTime + stopTime-startTime; 
     System.out.println(i + "," + Long.toString(stopTime-startTime)); 
    } 

    channel.close(); 
    connection.close(); 

} catch (Exception e) { 
    e.printStackTrace(); 
} 
} 
} 

的RabbitMQ Recv.java

private final static String QUEUE_NAME = "hello"; 
private static long startTime = 0; 
private static long stopTime = 0; 
private static long totalTime = 0; 
private static int numMessages = 0; 

public static void main(String[] argv) { 
    (new Thread(new Recv())).start(); 
} 

public void run(){ 
    try { 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("x.x.x.x"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     // No Persistence 
     // channel.queueDeclare(QUEUE_NAME, false, false, false, null); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(QUEUE_NAME, true, consumer); 

     while (true) { 
      startTime = System.nanoTime(); 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      stopTime = System.nanoTime(); 
      totalTime = totalTime + stopTime-startTime; 

      System.out.println(numMessages + "," + Long.toString(stopTime-startTime)); 

      numMessages++; 

     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 
} 

回答

2

好吧,我看了你的代碼和benchmarck的標記,但只是在Recv的方式。我看到RabbitMq數字是ActiveMq的兩倍。然後我看到了兩者的源代碼,並警告我..

在Rabbitqm Recv源代碼中,對於每條消息,您總是會執行queuDeclare,如果通信時間是當前主要延遲時間,那麼確保ActiveMq的雙倍時間比Rabbitmq來自這裏。

+0

嗨pfreixes,這是一個偉大的觀察。我將註釋掉隊列聲明並再次將測試運行到已聲明的隊列。我一定會更新結果......謝謝! – littleK

+0

我在原帖中添加了新的結果。這確實極大地改善了RabbitMQ的時間,現在它顯示比ActiveMQ更快的發送時間。但是,我仍然無法理解的一件事是ActiveMQ的消耗時間對於100,000條消息而言只有4秒。我查看了收到的實際消息,並且它們包含正確的內容。還有其他建議嗎? – littleK

+0

是的,我有一個關於你的ActiveMQ的想法是比一個F1快:) ......我想你應該改變你如何接收一條消息的總時間。目前您只需要一個CPU時間來運行多個操作碼,該消息已經在Receiver內存中!看看你的代碼。否則,最好的方法是檢查一箇中間代理的開銷時間是如何將開始時間放入消息中的,並從Recv中獲取它以確定發送一個味精的真實成本。 – pfreixes

相關問題