我正在Java中使用RabbitMQ。
我有兩個RabbitMQ服務器,具有相同的配置,一個是開發環境,另一個是生產環境。
這是對消費者的聲明:java - RabbitMQ消費者沒有收到消息
/*
* Connection and channel declaration
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(prop.getProperty("ConnectionURI"));
connection = factory.newConnection();
channel = connection.createChannel();
/*
* Queue declaration and exchange binding
*/
channel.exchangeDeclare(prop.getProperty("printExchange"), "topic", false, false, false, new HashMap<>());
queueName = prop.getProperty("printQueue");
routing_key = "print." + codCliente + "." + idCassa;
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, prop.getProperty("printExchange"), routing_key);
這裏,它開始偵聽的隊列:
JAyronPOS.LOGGER.info("Waiting for a message on the queue -> " + queueName + " with routingkey -> " + routing_key);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
JAyronPOS.LOGGER.info("This is the received message -> " + queueName + ": " + new String(body, "UTF-8"));
Map<String, Object> headers = properties.getHeaders();
if (envelope.getRoutingKey().equals(routing_key)) {
JAyronPOS.LOGGER.info("Message is for me, because it has my routing key");
channel.basicAck(envelope.getDeliveryTag(), false);
if (headers != null) {
if (headers.containsKey("command")) {
JAyronPOS.LOGGER.info("It's a command!");
JAyronPOS.LOGGER.info(headers.get("command").toString());
if ("requestClose".equals(headers.get("command").toString())) {
ChiusuraFiscaleConfirm confirm = gson.fromJson(new String(body, "UTF-8"), ChiusuraFiscaleConfirm.class);
if (confirm.isCanClose()) {
eseguiChiusuraFiscale();
} else {
JOptionPane.showMessageDialog(null, "Can't close", "Error", JOptionPane.ERROR_MESSAGE);
}
} else {
JAyronPOS.LOGGER.info("Can't handle the message");
}
}
} else {
System.out.println("It's a ticket");
TicketWrapper ticket = gson.fromJson(new String(body, "UTF-8"), TicketWrapper.class);
printTicket(ticket);
}
}else{
JAyronPOS.LOGGER.info("The message isn't for me, because it has the routingkey: "+envelope.getRoutingKey());
}
}
};
channel.basicConsume(queueName, false, consumer);
在開發環境中,我有最多5個隊列,而在生產環境中,我有在150到200個隊列之間。
消息由交換機發送,並帶有個人路由密鑰。發送的消息數量不高(強調時不超過10 msg/s)。
當我在開發環境中測試消費者時,一切正常:
- 我發送一個RPC調用,服務器對它進行處理並回復。消費者閱讀回覆並調用正確的方法。全部在大約1-2秒。
當我在生產環境中使用軟件時(我只是通過在config.properties文件中註釋/分解一行來更改環境),它不起作用:
- 我發送RPC調用,服務器對它進行處理,在隊列上發送回覆。消費者永遠不會收到消息(但我可以看到Web管理面板在隊列中開發的消息)。
這可能是問題嗎?
編輯:我注意到,如果我發送RPC調用,在RabbitMQ web面板中的回覆隊列中,在「Deliver」(淺藍色)下有一條消息,而如果我發送3-4個RPC調用(與前一個相同),在某個呼叫之後,在回覆隊列中,在發佈(黃色)下有一條消息,並且消費者接收到回覆。