2016-10-05 81 views
0

我有一個標準的ActiveMQ代理ActiveMQ的檢測新的消費主題

private static String localVMurl = "vm://localhost"; 

    broker = new BrokerService(); 
    broker.addConnector(localVMurl); 
    broker.start(); 

,一切都很好。我的目標是,消費者通過特定主題與經紀人建立聯繫。一旦這個連接是detectd如果一個生產者正在主動生產該主題,代理將傳遞消息,或者該代理將爲該特定主題啓動一個新的生產者。但是爲了做到這一點,我需要有人檢測新消費者何時連接並請求特定主題。

我的基本消費代碼:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(remoterURL); 
    Connection connection = connectionFactory.createConnection(); 
    connection.setClientID("clinet1"); 
    connection.start(); 

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    Topic topic = session.createTopic("some_topic"); 
    MessageConsumer consumer = session.createConsumer(topic); 
    consumer.setMessageListener(new MyMessageListener()); 

,我可以在代理日誌中看到:

<161005 11:03:41> [.0.1:[email protected]] DEBUG tRegion - localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Topic.some_topic 

所以我知道消費者連接,並訂閱這個話題,我只需要趕上不知怎的,這個事件。

任何想法如何做到這一點?

回答

1

諮詢留言是你需要的。每次您收到此代碼的消息時,這意味着您有新的消費者啓動或停止。

DOC http://activemq.apache.org/advisory-message.html

例如:

//org.apache.activemq.advisory.AdvisorySupport.getDestinationAdvisoryTopic(Destination) 
Destination advisoryDestination = AdvisorySupport.getConsumerAdvisoryTopic(topic) 
MessageConsumer consumer = session.createConsumer(advisoryDestination); 
consumer.setMessageListener(this); 

public void onMessage(Message msg){ 
    if (msg instanceof ActiveMQMessage){ 
     try { 
      ActiveMQMessage aMsg = (ActiveMQMessage)msg; 
      ConsumerInfo consumer = (ConsumerInfo) aMsg.getDataStructure(); 
     } catch (JMSException e) { 
      log.error("Failed to process message: " + msg); 
     } 
    } 
} 
+0

我需要監聽的經紀人,而不是消費者。經紀人不知道消費者會提前訂閱哪個主題。這看起來不像我想要的? – user1772250

+0

你能解釋一下嗎,我的理解是**所以我知道消費者連接,並訂閱這個主題,我只需要以某種方式捕捉該事件。**和我的代碼連接到經紀人,並聽取它,做什麼你的意思是**我需要聽取經紀人,而不是消費者。** –

+0

如果您需要此事件而不事先知道目的地,您可以用'主題'替換'Destination advisoryDe​​stination = AdvisorySupport.getConsumerAdvisoryTopic(topic);' advisoryDe​​stination = session.createTopic(「ActiveMQ.Advisory.Consumer.Topic。*」);'在代理中通知任何訂閱任何主題,並且通過使用ConsumerInfo,您可以檢索另一個消息consumer.getDestination( );' –