2016-09-07 76 views
1

我已經使用ActiveMQ發送消息並異步接收它們。等待在ActiveMQ中接收異步消息的更好方法

在那裏,我在決定等待消息的最佳方式時遇到問題。 在一個循環中睡眠線程是一種選擇。但感覺對我來說並不好看。

任何人都可以爲此提出更好的方法。

AsyncReceiver.java

public class AsyncReceiver implements MessageListener, ExceptionListener{ 

    public static void main(String[] args) throws Exception{ 

     Properties env = new Properties();         
     env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory"); 
     env.put(Context.PROVIDER_URL, "tcp://localhost:61616"); 
     env.put("queue.queueSampleQueue","MyNewQueue"); 

     InitialContext ctx = new InitialContext(env); 
     Queue queue = (Queue) ctx.lookup("queueSampleQueue"); 
     QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory"); 
     QueueConnection queueConn = connFactory.createQueueConnection(); 
     QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 

     QueueReceiver queueReceiver = queueSession.createReceiver(queue); 
     AsyncReceiver asyncReceiver = new AsyncReceiver(); 
     queueReceiver.setMessageListener(asyncReceiver); 
     queueConn.setExceptionListener(asyncReceiver); 
     queueConn.start(); 

     // Waiting for messages 
     System.out.print("waiting for messages"); 
     for (int i = 0; i < 10; i++) { 
      Thread.sleep(1000); 
     } 

     queueConn.close(); 
    } 

    public void onMessage(Message message){ 
     TextMessage msg = (TextMessage) message; 
     try { 
      System.out.println("received: " + msg.getText()); 
     } catch (JMSException ex) { 
      ex.printStackTrace(); 
     } 
    } 

    public void onException(JMSException exception){ 
     System.err.println("an error occurred: " + exception); 
    } 
} 

Sender.java

public class Sender{ 

    public static void main(String[] args) throws Exception{ 

     Properties env = new Properties(); 
     env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory"); 
     env.put(Context.PROVIDER_URL, "tcp://localhost:61616"); 
     env.put("queue.queueSampleQueue", "MyNewQueue"); 

     InitialContext ctx = new InitialContext(env); 
     Queue queue = (Queue) ctx.lookup("queueSampleQueue"); 
     QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory"); 
     QueueConnection queueConn = connFactory.createQueueConnection(); 
     QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE); 

     QueueSender queueSender = queueSession.createSender(queue); 
     queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     TextMessage message = queueSession.createTextMessage("Hello"); 
     queueSender.send(message); 
     System.out.println("sent: " + message.getText()); 

     queueConn.close(); 
    } 
} 

回答

3

有處理/兩種方式消耗在隊列按摩。

  1. 定期檢查隊列中是否有新的按摩 - 如果您定期運行程序,這很適合。你可以通過執行一些帶有一些線程睡眠的循環來做到這一點。防爆。每天兩次,每天一次等。

  2. 註冊使用者(使用MessageListener)和隊列。你可以做下面的例子。

Consumer.java

 javax.jms.Connection connection = null; 
     Session session = null; 
     Destination destination = null; 
     MessageConsumer consumer = null; 


     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); 
     connection = connectionFactory.createConnection(); 
     connection.start(); 


     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     destination = session.createQueue(queueName); 


     consumer = session.createConsumer(destination); 
     consumer.setMessageListener(new YourClass()); 

YourClass.java

public class YourClass implements MessageListener { 
@Override 
public void onMessage(Message message) { 
       TextMessage textMessage = (TextMessage) message; 
       inputJsonString = textMessage.getText(); 
       //do what ever you want with inputJsonString 
       message.acknowledge(); 
      } 

}

+0

非常感謝您NPrasad。你是一個拯救生命的人! – namalfernandolk