我第一次使用MQ並嘗試使用RabbitMQ實現日誌記錄系統。我的實施涉及'發件人'在消費者正在收聽消息之前,消費者沒有收到來自MQ的消息
/*
* This class sends messages over MQ
*/
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
}
channel.close();
connection.close();
}
}
它向我的每個顏色發送一條消息到交換機,其中顏色將用作綁定。它涉及一個「接收器」
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
receiveMessagesFromQueue(2);
}
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
這是考慮作爲一個參數代表我想哪個顏色綁定它從交換供給一個數字。
在我的實現和一般RabbitMQ中,似乎郵件存儲在交換機中,直到Consumer
請求它們,此時它們被分配到它們各自的隊列,然後一次一個地發送到客戶端(或MQ語言中的消費者)。我的問題是,當我在運行MQReceiver
類之前運行MQSender
類時,消息永遠不會傳遞。但是,當我首先運行MQReceiver
類時,會收到消息。從我對MQ的理解中,我會認爲這些消息應該存儲在服務器上,直到類運行,然後這些消息應該傳遞給它們的消費者,但這不是發生的事情。我的主要問題是這些消息是否可以存儲在交換機中,如果不存在,它們應該存儲在何處,以便在消費者(即我的MQReceiver
類)被調用後交付它們?
感謝您的幫助!
只是一個猜測,但我懷疑你的'Sender'被丟棄由於缺乏註冊'消費者' – StormeHawke
的消息你可能有autoAck設置爲tru è?更多信息在這裏:http://www.rabbitmq.com/tutorials/tutorial-two-java.html –
這也許? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –