我嘗試在連接中斷的情況下使用自動恢復功能。 我有2個問題:amqp - 在連接中斷的情況下使用自動恢復功能
1)自動恢復代碼看起來完全忽略networkRecoveryInterval。在我的日誌文件中,連接斷開的1分鐘內,文件增長到1.5 GB。以下錯誤不斷重複。
連接恢復期間發生異常! java.net.SocketException異常:網絡是在 java.net.PlainSocketImpl.socketConnect(本機方法)在 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 在 java.net.AbstractPlainSocketImpl.connectToAddress(可達AbstractPlainSocketImpl.java:200) 處 java.net.Socket.connect java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 在java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)(插座.java:579)at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32) at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory .newConnection(RecoveryAwareAMQConnectionFactory.java:34) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:388) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection的.java:360) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access $ 000(AutorecoveringConnection.java:48) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection $ 1.shutdownCompleted(AutorecoveringConnection.java :345) at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75) at com.rabbitmq.client.impl.AMQConnection $ MainLoop.run(AMQConnection.java:572) 在java.lang.Thread.run(Thread.java:722)
2)最後,當我做開關我的路由器上,網絡恢復不起作用。我在我的日誌文件中得到這個。
夾縫時回收拓撲捕獲的異常而 回收隊列8923yrbk com.rabbitmq.client.TopologyRecoveryException異常:同時在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection回收隊列8923yrbk捕獲的異常 .recoverQueues(AutorecoveringConnection.java:459) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:424) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection .java:365) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access $ 000(AutorecoveringConnection.java:48) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection $ 1.shutdownCompleted(AutorecoveringConnection.java:345) 在 com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75) 在 com.rabbitmq.client.impl.AMQConnection $ MainLoop.run(AMQConnection.java:572) 在java.lang.Thread中。運行(線程。java:722)由: com.rabbitmq.client.AlreadyClosedException:連接已經由於連接錯誤而關閉了 ;原因: com.rabbitmq.client.MissedHeartbeatException:心跳與心跳 缺少=在 com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)45秒 在com.rabbitmq.client.impl.AMQChannel .rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 在com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) 在 com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:230) 在 com.rabbitmq。 client.impl.recovery.RecordedQueue.reco版本(RecordedQueue.java:36) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:448) ... 7個
這裏是我的,而簡單的消費側碼。請注意,我將所有代碼寫入新線程中,因爲我不希望構造函數阻塞。
private ConnectionFactory factory = null;
private Connection connection = null;
private Channel channel = null;
private PaymentInfoFromGlobalServerConsumer() {
new Thread(new Runnable() {
public void run() {
factory = new ConnectionFactory();
try {
factory.setUri(amqpServerUrl);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(30000); // In case of broken connection, try again every 30 seconds (hope this is correct understanding)
factory.setRequestedHeartbeat(45); //Keep sending the heartbeat every 45 seconds to prevent any routers from considering the connection stale.
} catch (KeyManagementException | NoSuchAlgorithmException | URISyntaxException e) {
//Will never happen if configured properly
logger.error(e);
return;
}
try {
connection = factory.newConnection();
channel = connection.createChannel();
//Create a durable queue (if not already present)
channel.queueDeclare(merchantId, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(merchantId, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String billId = new String(delivery.getBody());
//TODO - Redeliveries are possible as per design
System.out.println(" [x] Received '" + billId + "'");
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (IOException | ConsumerCancelledException | InterruptedException e) {
e.printStackTrace();
logger.error(e);
} catch (ShutdownSignalException e) {
System.out.println(e.isInitiatedByApplication() + " " + e.isHardError());
} finally {
close();
}
}
}).start();
}
public void close() {
try {
if (channel != null) channel.close();
} catch (IOException | AlreadyClosedException e) {
//Cannot do anything now
}
try {
if (connection != null) connection.close();
} catch (IOException | AlreadyClosedException e) {
//Cannot do anything now
}
}
我是amqp的新手,所以任何幫助表示讚賞。謝謝
默認啓用 – Strikki