2012-11-07 72 views
1

我想擺脫某些隊列中的所有「消費者數量」。每當我清除/刪除隊列時,如果我再次使用相同的名稱創建該隊列,則消費者的數量仍然保留。即使有0個未決消息,仍然有6個消費者。如何殺死activemq中的消費者

我的問題可能會阻止我的java代碼,而不會關閉會話或連接。

我已經嘗試重新啓動並重新安裝服務器。

這裏是我的製片代碼:

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; 

    public static String addElementToQueue(String queueName,String param1, String param2) throws JMSException, NamingException { 
    // Getting JMS connection from the server and starting it 
     ConnectionFactory connectionFactory = 
       new ActiveMQConnectionFactory(url); 
     Connection connection = connectionFactory.createConnection(); 

// JMS messages are sent and received using a Session. We will 
     // create here a non-transactional session object. If you want 
     // to use transactions you should set the first parameter to 'true' 
     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     // Destination represents here our queue on the 
     // JMS server. You don't have to do anything special on the 
     // server to create it, it will be created automatically. 
     Destination destination = session.createQueue(queueName); 

     // MessageProducer is used for sending messages (as opposed 
     // to MessageConsumer which is used for receiving them) 
     MessageProducer producer = session.createProducer(destination); 

     String queueMessage = param1+ "-" + param2; 

     TextMessage message = session.createTextMessage(queueMessage); 

     // Here we are sending the message! 
     producer.send(message); 

     connection.close(); 
     session.close();  // added after problem came up 
     producer.close();  // added after problem came up 

     return commandID; 
} 

這裏是我的消費者代碼:

// URL of the JMS server 
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; 

    public static Pair consumeNextElement(String queueName) throws JMSException { 
     // Getting JMS connection from the server 
     ConnectionFactory connectionFactory 
       = new ActiveMQConnectionFactory(url); 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 

     // Creating session for seding messages 
     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     // Getting the queue 
     Destination destination = session.createQueue(queueName); 

     // MessageConsumer is used for receiving (consuming) messages 
     MessageConsumer consumer = session.createConsumer(destination); 


     // Here we receive the message. 
     // By default this call is blocking, which means it will wait 
     // for a message to arrive on the queue. 
     Message message = consumer.receive(); 

     // There are many types of Message and TextMessage 
     // is just one of them. Producer sent us a TextMessage 
     // so we must cast to it to get access to its .getText() 
     // method. 

     String[] parts = ((TextMessage)message).getText().split("-"); 
     Pair retVal = new Pair(parts[0], parts[1]); 

     connection.close(); 
     session.close();  // added after problem came up 
     consumer.close();  // added after problem came up 

     return retVal; 
    } 

有什麼想法?

謝謝。

+0

發佈一些代碼! –

+0

我的編輯包含消費者/生產者代碼。謝謝。 –

+0

在清除隊列後需要您殺死用戶的用例是什麼? –

回答

3

消費者的數量是隊列上的偵聽器的數量。清除隊列只應刪除已排隊的消息 - 那些正在偵聽的消費者將不受影響。

消費者維護/重建連接的能力可能取決於用於連接的傳輸,並且可能允許對連接屬性進行一些調整。

坦率地說,我沒有太多的經驗,但您可以調查Advisory Messages作爲幫助調試連接的方法。除報告消費者數量之外,JMX界面或Web控制檯似乎沒有幫助。