我在想,如果使用ActiveMQ而不是RabbitMQ來實現更快的原始消息傳遞吞吐量(對於發佈和消費)是不尋常的?我在問,因爲我遇到過的每一個其他在線參考都讓RabbitMQ更快。使用ActiveMQ獲得比RabbitMQ更好的性能
我沒有使用合法的基準測試工具進行測試;相反,我修改了兩個基本的發佈者/消費者示例,以使用3千字節的郵件正文測試100,000條消息。請注意,我正在測試跨兩個不同Amazon EC2 x大型實例的發佈和消耗。也許我沒有正確設置我的代碼?請參閱下面的結果和代碼。
ActiveMQ Send 3kb
Average Time per Message (ns): 497276.1179
Average # Messages per second: 2010.935101
Total Time (s): 49.72810906
ActiveMQ Recv 3kb
Average Time per Message (ns): 43813.35476
Average # Messages per second: 22823.86285
Total Time (s): 4.381379289
RabbitMQ Send 3kb
Average Time per Message (ns): 1041524.626
Average # Messages per second: 960.1309229
Total Time (s): 104.1524626
RabbitMQ Recv 3kb
Average Time per Message (ns): 612559.3732
Average # Messages per second: 1632.494814
Total Time (s): 61.25593732
更新號碼RabbitMQ的Send.java & Recv.java去除queueDeclare()後:
這極大地改善了的RabbitMQ的時間,但一些必須的只有4秒ActiveMQ的消費時間是關閉...
ActiveMQ Send 3kb
Average Time per Message (ns): 491404.5666
Average # Messages per second: 2034.983124
Total Time (s): 49.14045666
ActiveMQ Recv 3kb
Average Time per Message (ns): 41976.17158
Average # Messages per second: 23823.03965
Total Time (s): 4.197617158
RabbitMQ Send 3kb
Average Time per Message (ns): 354795.8818
Average # Messages per second: 2818.522005
Total Time (s): 35.47958818
RabbitMQ Recv 3kb
Average Time per Message (ns): 440349.3892
Average # Messages per second: 2270.924009
Total Time (s): 44.03493892
ActiveMQ的Send.java
public class Send implements Runnable {
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
public static void main(String[] argv) throws java.io.IOException {
(new Thread(new Send())).start();
}
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i=0; i <= NUMBER_OF_MESSAGES; i++){
startTime = System.nanoTime();
// 3kb
String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..."
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
//System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(i + "," + Long.toString(stopTime-startTime));
}
// Clean up
session.close();
connection.close();
//System.out.println("");
//System.out.println("Total Time: " + totalTime + "ns");
//System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns");
//System.out.println("");
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
的ActiveMQ Recv.java
public class Recv implements Runnable {
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;
public static void main(String[] argv)
throws java.io.IOException {
(new Thread(new Recv())).start();
}
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Message Listener
MyListener listener = new MyListener();
consumer.setMessageListener(listener);
// Wait for a message
//Message message = consumer.receive(1000);
// consumer.close();
// session.close();
// connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public class MyListener implements MessageListener {
public void onMessage(Message message) {
try {
startTime = System.nanoTime();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(numMessages + "," + Long.toString(stopTime-startTime));
numMessages++;
} else {
System.out.println("Received: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
的RabbitMQ Send.java
public class Send implements Runnable {
private final static String QUEUE_NAME = "hello";
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
// 3kb
private static final String message = "AMFu8UlKW2zJB..."
public static void main(String[] argv)
throws java.io.IOException {
(new Thread(new Send())).start();
}
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i=1; i <= NUMBER_OF_MESSAGES; i++){
startTime = System.nanoTime();
// No Persistence
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(i + "," + Long.toString(stopTime-startTime));
}
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
的RabbitMQ Recv.java
private final static String QUEUE_NAME = "hello";
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;
public static void main(String[] argv) {
(new Thread(new Recv())).start();
}
public void run(){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("x.x.x.x");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// No Persistence
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
startTime = System.nanoTime();
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(numMessages + "," + Long.toString(stopTime-startTime));
numMessages++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
嗨pfreixes,這是一個偉大的觀察。我將註釋掉隊列聲明並再次將測試運行到已聲明的隊列。我一定會更新結果......謝謝! – littleK
我在原帖中添加了新的結果。這確實極大地改善了RabbitMQ的時間,現在它顯示比ActiveMQ更快的發送時間。但是,我仍然無法理解的一件事是ActiveMQ的消耗時間對於100,000條消息而言只有4秒。我查看了收到的實際消息,並且它們包含正確的內容。還有其他建議嗎? – littleK
是的,我有一個關於你的ActiveMQ的想法是比一個F1快:) ......我想你應該改變你如何接收一條消息的總時間。目前您只需要一個CPU時間來運行多個操作碼,該消息已經在Receiver內存中!看看你的代碼。否則,最好的方法是檢查一箇中間代理的開銷時間是如何將開始時間放入消息中的,並從Recv中獲取它以確定發送一個味精的真實成本。 – pfreixes