2015-11-26 72 views
0

我使用故障轉移協議,SSL(NIO)和simpleAuthenticationPlugin,在ActiveMQ中有大量(1000)個客戶端出現問題。經過一段時間brocker剛剛停止。我正在尋找日誌文件中的一些錯誤,但我沒有發現任何東西 - 它是空的。ActiveMQ中的大量SSL客戶端

服務器 - Ubuntu的15.04,Java版本 「1.7.0_80」 基本架構信息

我有一個分佈式系統。它由C#客戶端(節點)和一個Java客戶端(管理器)組成。我正在使用ActiveMQ 5.12.1安裝在管理器工作的同一臺主機上。通信僅在C#客戶端和經理之間進行。 C#客戶端彼此之間不進行通信。

每個客戶端(C#和java)都使用一個隊列進行偵聽。當管理員想要發送消息給節點時,它使用一個唯一的隊列來完成它。當一個節點想要與經理溝通時,它會向經理隊列發送消息。 管理器每分鐘輪詢每個節點。經理有2個線程,一個接收消息。其次,瞭解有關信息

這裏節點是代碼:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://(nio+ssl://localhost:61617)"); 
     connectionFactory.setUserName("user"); 
     connectionFactory.setPassword("password"); 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 
     connection.setExceptionListener(this); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Destination destination = session.createQueue("manager"); 
     MessageConsumer consumer = session.createConsumer(destination); 
     Message message= null; 
     while (!stop) { 
      try{ 
      message = consumer.receive(); 
      if (message instanceof TextMessage) { 
       TextMessage textMessage = (TextMessage) message; 
       String text = textMessage.getText(); 
       processMessage(text); 
      } 
      } 
      catch (Exception e){ 
       Logger.getInstance().log("Exception in message receive loop continue"); 
      } 
     } 

經理代碼發送消息:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://(nio+ssl://localhost:61617)"); 
     connectionFactory.setUserName("user"); 
     connectionFactory.setPassword("password"); 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     while(!stop) { 
      for (everynode) { 
        Destination destination = session.createQueue(uniqueNodeQueue); 
        MessageProducer producer = session.createProducer(destination); 
        producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
        TextMessage message = session.createTextMessage("in"); 
        producer.send(message); 
        producer.close(); 
      } 
      Thread.sleep(time); 
     } 

     // Clean up 
     session.close(); 
     connection.close(); 

C#客戶端使用Apache.NMS.ActiveMQ;

brokerUri="failover://(ssl://" +Server + ":61617?transport.acceptInvalidBrokerCert=true"; 
public NonDurableQueueSubscriber(string queueName, string brokerUri) 
    { 
     this.queueName = queueName; 
     ConnectionFactory cf = new ConnectionFactory(brokerUri); 
     cf.Password = "password"; 
     cf.UserName = "user"; 
     this.connectionFactory = cf; 
     this.connection = this.connectionFactory.CreateConnection(); 
     if (this.connection.IsStarted) 
      connection.Stop(); 
     this.connection.Start(); 
     this.session = connection.CreateSession(); 
     ActiveMQQueue queue = new ActiveMQQueue(queueName); 
     this.consumer = this.session.CreateConsumer(queue, "2 > 1", false); 
     this.consumer.Listener += new MessageListener(OnMessage); 
    } 
public void OnMessage(IMessage message) 
    { 
     ITextMessage textMessage = message as ITextMessage; 
     if (this.OnMessageReceived != null) 
     { 
      this.OnMessageReceived(textMessage.Text); 
     } 
    } 
static void onMessageReceived(string message) 
    { 
     NonDurableQueuePublisher mypublisher = new NonDurableQueuePublisher(queueManager, activemqBrokerUrl); 
      mypublisher.SendMessage(info()); 
      mypublisher.Dispose(); 
    } 
public NonDurableQueuePublisher(String queueName, string brokerUri) 
    { 
     this.queueName = queueName; 
     ConnectionFactory cf = new ConnectionFactory(brokerUri); 
     cf.Password = "password"; 
     cf.UserName = "user"; 
     this.connectionFactory = cf; 
     this.connection = connectionFactory.CreateConnection(); 
     this.connection.Start(); 
     this.session = connection.CreateSession(); 
     ActiveMQQueue queue = new ActiveMQQueue(queueName); 
     this.producer = session.CreateProducer(queue); 
    } 
public void SendMessage(String msg) 
    { 
     if (!this.isDisposed) 
     { 
      ITextMessage txtMessage = session.CreateTextMessage(msg); 
      //txtMessage.NMSPersistent = false; 
      producer.Send(txtMessage); 
     } 
     else 
     { 
      throw new ObjectDisposedException(this.GetType().FullName); 
     } 
    } 

activemq.xml中配置的ActiveMQ布洛克爾

<beans 
    xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 

http://activemq.apache.org/schema/core 

http://activemq.apache.org/schema/core/activemq-core.xsd"> 


<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
    <property name="locations"> 
     <value>file:${activemq.conf}/credentials.properties</value> 
    </property> 
</bean> 

<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" 
     lazy-init="false" scope="singleton" 
     init-method="start" destroy-method="stop"> 
</bean> 

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" deleteAllMessagesOnStartup="true" dataDirectory="${activemq.data}"> 

    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
      <policyEntry topic=">" > 

       <pendingMessageLimitStrategy> 
       <constantPendingMessageLimitStrategy limit="1000"/> 
       </pendingMessageLimitStrategy> 
      </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 


    <managementContext> 
     <managementContext createConnector="false"/> 
    </managementContext> 


    <persistenceAdapter> 
     <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 

     <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage percentOfJvmHeap="70" /> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="100 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="50 gb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 



<plugins> 
<simpleAuthenticationPlugin> 
<users> 
<authenticationUser username="user" password="password" groups="users,admins"/> 
</users> 
</simpleAuthenticationPlugin> 
</plugins> 
<sslContext> 
     <sslContext keyStore="/ssl/broker.ks" 
        keyStorePassword="password"/> 
    </sslContext> 
    <transportConnectors> 
    <transportConnector name="nio+ssl" uri="nio+ssl://0.0.0.0:61617?maximumConnections=20000&amp;wireFormat.maxFrameSize=104857600"/> 

    </transportConnectors> 


    <shutdownHooks> 
     <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
    </shutdownHooks> 

</broker> 

<import resource="jetty.xml"/> 

當我用NIO一些有趣的切日誌:

2015-11-25 17:20:31,495 | ERROR | Could not accept connection from null: java.io.IOException: javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? | org.apache.activemq.broker.TransportConnector | ActiveMQ BrokerService[localhost] Task-616 
2015-11-25 17:20:31,495 | WARN | Transport Connection to: tcp://192.168.1.4:51939 failed: javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ NIO Worker 151 
2015-11-25 17:20:25,493 | ERROR | Could not accept connection from null: java.io.IOException: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0 | org.apache.activemq.broker.TransportConnector | ActiveMQ BrokerService[localhost] Task-787 
2015-11-25 17:20:19,309 | ERROR | Could not accept connection from null: java.io.IOException: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0 | org.apache.activemq.broker.TransportConnector | ActiveMQ BrokerService[localhost] Task-669 
2015-11-25 17:19:53,051 | WARN | Transport Connection to: tcp://192.168.1.23:51587 failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive (no connection attempt made) for too (>30000) long: tcp://192.168.1.23:51587 | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ InactivityMonitor Worker 

NIO之前,我已經試過只SSL中,結果是相同→brocker關機。

任何任何想法?

回答

0

您似乎超出了activemq.xml中配置的最大連接數限制。您可以通過JMX檢查連接的數量以確認這一點。

您需要了解爲什麼您的設置會創建如此多的打開的連接,以及爲什麼它沒有關閉,如果這是打算。 C#代碼看起來有點狡猾(沒有關閉任何地方),但是因爲我看不到它可能沒有問題。

這似乎不是一個ActiveMQ問題,而是一個應用程序問題。