2017-04-10 48 views
0

我有一個使用ActiveMQ主題消息的羣集的tomcat。現在,如果集羣中的一個tomcat出現故障,那麼我猜測消費者的數量會減少1.檢測消費者ActiveMQ主題中的變化

現在,我想要在該主題上使用一些回調或偵聽器來檢測該更改。這是可行的嗎?

會是這樣的:Region.getDestinations(ActiveMQDestination)的工作?

回答

0

諮詢留言是你需要的。

每次您收到此代碼的消息時,這表示您有消費者啓動或停止。

DOC http://activemq.apache.org/advisory-message.html

例如:

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Queue; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.command.ActiveMQMessage; 
import org.apache.activemq.command.ConsumerInfo; 
import org.apache.activemq.command.RemoveInfo; 

public class AdvisorySupportConsumerAdvisoryTopic { 

    public static void main(String[] args) throws JMSException { 
     Connection conn = null; 
     try { 
      ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671"); 
      conn = cf.createConnection("admin", "admin"); 
      ActiveMQSession session = (ActiveMQSession) conn.createSession(false, 
        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
      conn.start(); 
      Queue q = session.createQueue("Q"); 
      Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(q); 
      MessageConsumer consumer = session.createConsumer(advisoryDestination); 
      consumer.setMessageListener(new MessageListener() { 
       @Override 
       public void onMessage(Message msg) { 
        if (msg instanceof ActiveMQMessage) { 
         try { 
          ActiveMQMessage aMsg = (ActiveMQMessage) msg; 
          System.out.println(aMsg.getStringProperty("consumerCount")); 
          System.out.println(aMsg.getStringProperty("producerCount")); 
          if (aMsg.getDataStructure() instanceof ConsumerInfo) { 
           // Consumer start 
           ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure(); 
           System.out.println(consumerInfo); 
          } else if (aMsg.getDataStructure() instanceof RemoveInfo) { 
           // Consumer stop 
           RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure(); 
           System.out.println(removeInfo); 
          } 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
      }); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (conn != null) { 
       try { 
        conn.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 
} 
每次得到與該代碼,這意味着消息的時間

你有連接開始或停止。

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.command.ActiveMQMessage; 
import org.apache.activemq.command.ConnectionInfo; 
import org.apache.activemq.command.RemoveInfo; 

public class AdvisorySupportConnectionAdvisoryTopic { 

    public static void main(String[] args) throws JMSException { 
     Connection conn = null; 
     try { 
      ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671"); 
      conn = cf.createConnection("admin", "admin"); 
      ActiveMQSession session = (ActiveMQSession) conn.createSession(false, 
        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
      conn.start(); 
      Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConnectionAdvisoryTopic(); 
      MessageConsumer consumer = session.createConsumer(advisoryDestination); 
      consumer.setMessageListener(new MessageListener() { 
       @Override 
       public void onMessage(Message msg) { 
        if (msg instanceof ActiveMQMessage) { 
         try { 
          ActiveMQMessage aMsg = (ActiveMQMessage) msg; 
          if (aMsg.getDataStructure() instanceof ConnectionInfo) { 
           // Connection start 
           ConnectionInfo connectionInfo = (ConnectionInfo) aMsg.getDataStructure(); 
           System.out.println(connectionInfo); 
          } else if (aMsg.getDataStructure() instanceof RemoveInfo) { 
           // Connection stop 
           RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure(); 
           System.out.println(removeInfo); 
          } 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
      }); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (conn != null) { 
       try { 
        conn.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 
}