2013-08-22 152 views
1

我第一次使用MQ並嘗試使用RabbitMQ實現日誌記錄系統。我的實施涉及'發件人'在消費者正在收聽消息之前,消費者沒有收到來自MQ的消息

/* 
* This class sends messages over MQ 
*/ 
public class MQSender { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     String[] levels = {"green", "orange", "red", "black"}; 
     for (String log_level : levels) { 
      String message = "This is a " + log_level + " message"; 
      System.out.println("Sending " + log_level + " message"); 
      //publish the message with each of the bindings in levels 
      channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes()); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

它向我的每個顏色發送一條消息到交換機,其中顏色將用作綁定。它涉及一個「接收器」

public class MQReceiver { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     receiveMessagesFromQueue(2); 
    } 

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     //generate random queue 
     String queueName = channel.queueDeclare().getQueue(); 

     //set bindings from 0 to maxLevel for the queue 
     for (int level = 0; level <= maxLevel; level++) { 
      channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]); 
     } 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queueName, true, consumer); 

     while(true) { 
      //waits until a message is delivered then gets that message 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      String routingKey = delivery.getEnvelope().getRoutingKey(); 

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); 
     } 
    } 
} 

這是考慮作爲一個參數代表我想哪個顏色綁定它從交換供給一個數字。

在我的實現和一般RabbitMQ中,似乎郵件存儲在交換機中,直到Consumer請求它們,此時它們被分配到它們各自的隊列,然後一次一個地發送到客戶端(或MQ語言中的消費者)。我的問題是,當我在運行MQReceiver類之前運行MQSender類時,消息永遠不會傳遞。但是,當我首先運行MQReceiver類時,會收到消息。從我對MQ的理解中,我會認爲這些消息應該存儲在服務器上,直到類運行,然後這些消息應該傳遞給它們的消費者,但這不是發生的事情。我的主要問題是這些消息是否可以存儲在交換機中,如果不存在,它們應該存儲在何處,以便在消費者(即我的MQReceiver類)被調用後交付它們?

感謝您的幫助!

+0

只是一個猜測,但我懷疑你的'Sender'被丟棄由於缺乏註冊'消費者' – StormeHawke

+0

的消息你可能有autoAck設置爲tru è?更多信息在這裏:http://www.rabbitmq.com/tutorials/tutorial-two-java.html –

+0

這也許? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –

回答

1

如果RabbitMQ的路由密鑰與綁定到交換機的任何隊列不匹配,則丟棄消息。當您首先啓動MQSender時,沒有隊列被綁定,因此它發送的消息將丟失。當你開始MQReceiver時,它將隊列綁定到交換機,所以RabbitMQ有一個地方可以放置來自MQSender的消息。當您停止MQReceiver時,由於您創建了匿名隊列,因此將從交換機中刪除隊列和所有綁定。

如果您希望郵件存儲在服務器上,而MQReceiver未運行,則需要接收方創建命名隊列,並將路由鍵綁定到該隊列。請注意,創建一個已命名的隊列是冪等的,如果該隊列已經存在,則不會創建該隊列。然後你需要接收者從指定的隊列中取消消息。

更改您的代碼看起來是這樣的:

MQSender

.... 
String namedQueue = "logqueue"; 
//declare named queue and bind log level routing keys to it. 
//RabbitMQ will put messages with matching routing keys in this queue 
channel.queueDeclare(namedQueue, false, false, false, null); 
for (int level = 0; level < LOG_LEVELS.length; level++) { 
    channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]); 
} 
... 

MQReceiver

... 
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

QueueingConsumer consumer = new QueueingConsumer(channel); 

//Consume messages off named queue instead of anonymous queue 
String namedQueue = "logqueue"; 
channel.basicConsume(namedQueue, true, consumer); 

while(true) { 
... 
相關問題