2012-07-21 46 views
1

我想了解RabbitMQ服務器與發件人和接收器程序。現在,當發送者發送單個消息並且接收者將收到相同的消息時,整個設置工作良好。發送者和接收器對在RabbitMQ

但是,當我發送兩條消息(通過運行發送器兩次)並運行接收器程序兩次,我只得到第一條消息。

發件人

ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 


     channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
     String message = "He12!"; 
     channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
     System.out.println("Sent "+message); 
     channel.close(); 
     connection.close(); 

接收機

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(QUEUE_NAME, true, consumer); 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    /*channel.basicCancel(consumer.getConsumerTag()); */ 

    String message; 
    if (delivery != null) { 
     message = new String(delivery.getBody()); 
     System.out.println("Reciever .."+message); 

    } 

    channel.close(); 
    connection.close(); 

回答

0

你有沒有嘗試過沒有

channel.basicCancel(consumer.getConsumerTag()); 

線?

此外,它會更好做這樣的事情:

while(true){ 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    String message; 
    if (delivery != null) { 
    message = new String(delivery.getBody()); 
    System.out.println("Reciever .."+message); 
    } 
} 

,讓你只有一個消費者的消費在循環中的所有消息。這只是一個例子,我只是有一種乾淨的方式來打破循環,而不是必須ctrl-c。

+0

感謝您的回覆..我曾嘗試沒有basicCancel(編輯相同的代碼),但問題仍然相同。我認爲更重要的是沒有一個while循環,這樣就可以讓Web服務一次發送一個消息。每個請求跨越一位消費者是不是一個好主意? – 2012-07-22 13:51:05

+0

通過以調試模式執行接收器並在RabitMq服務器中運行幾個命令,我觀察到在執行basicConsume函數調用後不久,隊列大小變爲零(命令rabbitmqctl list_queues) – 2012-07-22 16:55:28

+0

@praveena_kd在執行使用者之前,隊列大小是多少? – robthewolf 2012-07-22 18:20:51

1

當我用no_Ack = false和一個對BasicAck的調用修改了Receiver時,我解決了這個問題。感謝@robthewolf和@Card(通過twitter)提供幫助。

PFB現在修改過的接收器。

 ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(QUEUE_NAME, false, consumer); 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(10); 
    /*channel.basicCancel(consumer.getConsumerTag()); */ 

    String message = null; 

    if (delivery != null) { 
     message = new String(delivery.getBody()); 
     System.out.println("Reciever .."+message); 
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
    } 

    channel.close(); 
    connection.close(); 
    return message;