2014-05-17 62 views
0

我嘗試在連接中斷的情況下使用自動恢復功能。 我有2個問題:amqp - 在連接中斷的情況下使用自動恢復功能

1)自動恢復代碼看起來完全忽略networkRecoveryInterval。在我的日誌文件中,連接斷開的1分鐘內,文件增長到1.5 GB。以下錯誤不斷重複。

連接恢復期間發生異常! java.net.SocketException異常:網絡是在 java.net.PlainSocketImpl.socketConnect(本機方法)在 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 在 java.net.AbstractPlainSocketImpl.connectToAddress(可達AbstractPlainSocketImpl.java:200) 處 java.net.Socket.connect java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 在java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)(插座.java:579)at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32) at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory .newConnection(RecoveryAwareAMQConnectionFactory.java:34) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:388) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection的.java:360) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access $ 000(AutorecoveringConnection.java:48) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection $ 1.shutdownCompleted(AutorecoveringConnection.java :345) at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75) at com.rabbitmq.client.impl.AMQConnection $ MainLoop.run(AMQConnection.java:572) 在java.lang.Thread.run(Thread.java:722)

2)最後,當我做開關我的路由器上,網絡恢復不起作用。我在我的日誌文件中得到這個。

夾縫時回收拓撲捕獲的異常而 回收隊列8923yrbk com.rabbitmq.client.TopologyRecoveryException異常:同時在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection回收隊列8923yrbk捕獲的異常 .recoverQueues(AutorecoveringConnection.java:459) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:424) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection .java:365) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access $ 000(AutorecoveringConnection.java:48) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection $ 1.shutdownCompleted(AutorecoveringConnection.java:345) 在 com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75) 在 com.rabbitmq.client.impl.AMQConnection $ MainLoop.run(AMQConnection.java:572) 在java.lang.Thread中。運行(線程。java:722)由: com.rabbitmq.client.AlreadyClosedException:連接已經由於連接錯誤而關閉了 ;原因: com.rabbitmq.client.MissedHeartbeatException:心跳與心跳 缺少=在 com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)45秒 在com.rabbitmq.client.impl.AMQChannel .rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 在com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) 在 com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:230) 在 com.rabbitmq。 client.impl.recovery.RecordedQueue.reco版本(RecordedQueue.java:36) 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:448) ... 7個

這裏是我的,而簡單的消費側碼。請注意,我將所有代碼寫入新線程中,因爲我不希望構造函數阻塞。

private ConnectionFactory factory = null; 
private Connection connection = null; 
private Channel channel = null; 

private PaymentInfoFromGlobalServerConsumer() { 
    new Thread(new Runnable() { 
     public void run() { 
      factory = new ConnectionFactory(); 
      try { 
       factory.setUri(amqpServerUrl); 
       factory.setAutomaticRecoveryEnabled(true); 
       factory.setNetworkRecoveryInterval(30000); // In case of broken connection, try again every 30 seconds (hope this is correct understanding) 
       factory.setRequestedHeartbeat(45); //Keep sending the heartbeat every 45 seconds to prevent any routers from considering the connection stale. 
      } catch (KeyManagementException | NoSuchAlgorithmException | URISyntaxException e) { 
       //Will never happen if configured properly 
       logger.error(e); 
       return; 
      } 

      try { 
       connection = factory.newConnection(); 
       channel = connection.createChannel(); 
       //Create a durable queue (if not already present) 
       channel.queueDeclare(merchantId, true, false, false, null); 

       QueueingConsumer consumer = new QueueingConsumer(channel); 
       channel.basicConsume(merchantId, false, consumer); 

       while (true) { 
        QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
        String billId = new String(delivery.getBody()); 

        //TODO - Redeliveries are possible as per design 
        System.out.println(" [x] Received '" + billId + "'"); 
        System.out.println(" [x] Done"); 

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
       }    
      } catch (IOException | ConsumerCancelledException | InterruptedException e) { 
       e.printStackTrace(); 
       logger.error(e); 
      } catch (ShutdownSignalException e) { 
       System.out.println(e.isInitiatedByApplication() + " " + e.isHardError()); 
      } finally { 
       close(); 
      } 
     }   
    }).start(); 
} 

public void close() { 
    try { 
     if (channel != null) channel.close(); 
    } catch (IOException | AlreadyClosedException e) { 
     //Cannot do anything now 
    } 
    try { 
     if (connection != null) connection.close(); 
    } catch (IOException | AlreadyClosedException e) { 
     //Cannot do anything now 
    } 
} 

我是amqp的新手,所以任何幫助表示讚賞。謝謝

回答

-1

嘗試隨連接一起恢復拓撲。

factory.setTopologyRecoveryEnabled(true); 
+5

默認啓用 – Strikki

相關問題