2014-04-15 71 views
1

我有一個超級簡單的場景:一個經紀人和一個持久訂閱的消費者。 這是我的消費者應用程序的代碼:ActiveMQ故障轉移似乎不起作用

package test; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnectionFactory; 

import pojo.Event; 
import pojo.StockUpdate; 

public class Consumer 
{ 

    private static transient ConnectionFactory factory; 
    private transient Connection connection; 
    private transient Session session; 
    public static int counter = 0; 

    public Consumer(String brokerURL) throws JMSException 
    { 
     factory = new ActiveMQConnectionFactory(brokerURL); 
     connection = factory.createConnection(); 
     connection.setClientID("CLUSTER_CLIENT_1"); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    } 

    public void close() throws JMSException 
    { 
     if (connection != null) 
     { 
      connection.close(); 
     } 
    } 

    public static void main(String[] args) throws JMSException 
    { 

     try 
     { 
      // extract topics from the rest of arguments 
      String[] topics = new String[2]; 
      topics[0] = "CSCO"; 
      topics[1] = "ORCL"; 

      // define connection URI 
      Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true"); 

      for (String stock : topics) 
      { 
       try 
       { 
        Destination destination = consumer.getSession().createTopic("STOCKS." + stock); 
        // consumer.getSession(). 
        MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic) destination, "STOCKS_DURABLE_CONSUMER_" + stock); 
        messageConsumer.setMessageListener(new Listener()); 
       } 
       catch (JMSException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     } 
     catch (Throwable t) 
     { 
      t.printStackTrace(); 
     } 

    } 

    public Session getSession() 
    { 
     return session; 
    } 

} 

class Listener implements MessageListener 
{ 

    public void onMessage(Message message) 
    { 
     try 
     { 
      TextMessage textMessage = (TextMessage) message; 
      String json = textMessage.getText(); 
      Event event = StockUpdate.fromJSON(json, StockUpdate.class); 
      System.out.println("Consumed message #:" + ++Consumer.counter + "\n" + event); 
     } 
     catch (Exception e) 
     { 
      e.printStackTrace(); 
     } 
    } 

} 

這裏是我的activemq.xml中

<beans 
    xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

    <!-- Allows us to use system properties as variables in this configuration file --> 
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
     <property name="locations"> 
      <value>file:${activemq.conf}/credentials.properties</value> 
     </property> 
    </bean> 

    <!-- 
     The <broker> element is used to configure the ActiveMQ broker. 
    --> 
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="R6_cluster_broker1" persistent="true"> 

     <networkConnectors> 
      <networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/> 
     </networkConnectors> 

     <destinationPolicy> 
      <policyMap> 
       <policyEntries> 
       <policyEntry topic=">" > 
        <!-- The constantPendingMessageLimitStrategy is used to prevent 
         slow topic consumers to block producers and affect other consumers 
         by limiting the number of messages that are retained 
         For more information, see: 

         http://activemq.apache.org/slow-consumer-handling.html 

        --> 
        <pendingMessageLimitStrategy> 
        <constantPendingMessageLimitStrategy limit="1000"/> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
       </policyEntries> 
      </policyMap> 
     </destinationPolicy> 


     <!-- 
      The managementContext is used to configure how ActiveMQ is exposed in 
      JMX. By default, ActiveMQ uses the MBean server that is started by 
      the JVM. For more information, see: 

      http://activemq.apache.org/jmx.html 
     --> 
     <managementContext> 
      <managementContext createConnector="false"/> 
     </managementContext> 

     <!-- 
      Configure message persistence for the broker. The default persistence 
      mechanism is the KahaDB store (identified by the kahaDB tag). 
      For more information, see: 

      http://activemq.apache.org/persistence.html 
     --> 
     <persistenceAdapter> 
      <kahaDB directory="/work/temp/kahadb"/> 
     </persistenceAdapter> 


      <!-- 
      The systemUsage controls the maximum amount of space the broker will 
      use before disabling caching and/or slowing down producers. For more information, see: 
      http://activemq.apache.org/producer-flow-control.html 
      --> 
      <systemUsage> 
      <systemUsage> 
       <memoryUsage> 
        <memoryUsage percentOfJvmHeap="70" /> 
       </memoryUsage> 
       <storeUsage> 
        <storeUsage limit="100 gb"/> 
       </storeUsage> 
       <tempUsage> 
        <tempUsage limit="50 gb"/> 
       </tempUsage> 
      </systemUsage> 
     </systemUsage> 

     <!-- 
      The transport connectors expose ActiveMQ over a given protocol to 
      clients and other brokers. For more information, see: 

      http://activemq.apache.org/configuring-transports.html 
     --> 
     <transportConnectors> 
      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> --> 
     </transportConnectors> 

     <!-- destroy the spring context on shutdown to stop jetty --> 
     <shutdownHooks> 
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
     </shutdownHooks> 

    </broker> 

    <!-- 
     Enable web consoles, REST and Ajax APIs and demos 
     The web consoles requires by default login, you can disable this in the jetty.xml file 

     Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details 
    --> 
    <import resource="jetty.xml"/> 

</beans> 

當我有兩個經紀人和消費者的運行,然後停止我的消費者離開後幾分鐘代理。據我瞭解,它必須嘗試重新連接,但事實並非如此。我做錯了什麼,請指教。

!注意!我在Eclipse中啓動消費者,我不爲此任務構建獨立的jar。

我已經將我的經紀人更新到最新的5.9.1版本,並且對我的消費者也這樣做。結果是一樣的 - 在我停止經紀人後,我的消費者在幾秒鐘後死亡。如果經紀人正在運行,它會正常工作。

+0

我已經將我的經紀人更新至最新的5.9.1版,並且對我的消費者也做了同樣的處理。結果是一樣的 - 在我停止經紀人後,我的消費者在幾秒鐘後死亡。如果經紀人正在運行,它會正常工作。 –

回答

1

好的,問題實際上是在我的代碼中:沒有任何東西可以阻止主線程退出。由於實現故障轉移的線程是一個守護進程線程,因此消費者應用程序在沒有任何東西駐留(沒有非守護進程線程)之後立即終止。

+0

我剛剛發現了同樣的「問題」。感謝您確認原因! 你實施了什麼樣的解決方案來避免主線程終止? –

+0

據我記得,我已經添加了關機鉤子。 –

0

很可能你正在使用的ActiveMQ版本有一個錯誤,導致所有的守護進程線程,這意味着沒有什麼可以保持客戶端運行。升級到更高版本,如v5.9.1,看看是否有幫助。如果沒有發佈更多的信息,因爲你沒有提供太多的信息。

+0

Tim,我正在使用Apache ActiveMQ 5.9.0 您希望我發佈什麼額外信息? –

+0

我已更新所有信息的問題。 –

+0

我可以用'5.13.2'確認行爲。 –