2017-07-03 160 views
0

我使用Apache ActiveMQ對大量消息進行排隊,然後在一天結束時將它們出列。不過,我對ActiveMQ的運作方式感到困惑。在我的PC上,我沒有安裝ActiveMQ作爲服務,也沒有在某處安裝服務器。我剛纔包括「ActiveMQ的 - 全5.14.5.jar」作爲我的項目Maven的依賴,我使用下面的代碼至今:以編程方式配置Apache ActiveMQ

public static void main(String[] args) throws URISyntaxException, Exception { 
    BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)")); 
    broker.start(); 
    Connection connection = null; 
    try { 
     // Producer 
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848"); 
     connection = connectionFactory.createConnection(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue queue = session.createQueue("customerQueue"); 
     String payload = "Important Task"; 
     Message msg = session.createTextMessage(payload); 
     MessageProducer producer = session.createProducer(queue); 
     System.out.println("Sending text '" + payload + "'"); 
     msg.setLongProperty("_AMQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000L); 
     producer.send(msg); 

     // Consumer 
     MessageConsumer consumer = session.createConsumer(queue); 
     connection.start(); 
     QueueBrowser browser = session.createBrowser(queue); 
     while (browser.getEnumeration().hasMoreElements()) { 
      TextMessage textMsg = (TextMessage) consumer.receive(); 
      browser.getEnumeration().nextElement(); 
      System.out.println(textMsg); 
      System.out.println("Received: " + textMsg.getText()); 
     } 

     session.close(); 
    } finally { 
     if (connection != null) { 
      connection.close(); 
     } 
     broker.stop(); 
    } 
} 

正如你所看到的,我不想耽誤一消息5秒(或更多,可能會有所不同),但在我找到的每個指南中,我都會指示配置XML配置文件。但是,這是在您將ActiveMQ作爲服務運行時使用的文件。我目前僱用只是罐庫

最初,我已經安裝了Glassgfish服務器,以便使用JMS來排隊所有消息,但自那之後我放棄了該項目,但IP仍在使用ActiveMQ(localhost:4848)。

請注意,以下是一個非常有效的示例-KahaDB也用於在發生服務器故障時存儲消息。

就我而言,ActiveMQ確實從STS啓動本地服務器,我正在運行此代碼,但配置文件在哪裏?我可以通過編程方式更改其屬性

+0

你嘗試像'broker.setSchedulerSupport(真)'? (請參閱http://activemq.apache.org/maven/5.11.0/apidocs/org/apache/activemq/broker/BrokerService.html#setSchedulerSupport(boolean)) – Tome

+0

我剛剛做了,它不起作用。 – Lefteris008

+0

你確定你使用的_AMQ_SCHED_DELIVERY屬性? ActiveMQ屬性應該是'AMQ_SCHEDULED_DELAY'(請參閱http://activemq.apache.org/delay-and-schedule-message-delivery.html) – Tome

回答

2

這應該工作(適用於我們與ActiveMQ 5.12.3)。請務必先清理您的KahaDB商店,以避免先前的消息被讀取。

public static void main(String[] args) throws Exception { 
    BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)")); 
    broker.setSchedulerSupport(true); 
    broker.start(); 
    Connection connection = null; 
    try { 
     // Producer 
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848"); 
     connection = connectionFactory.createConnection(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue queue = session.createQueue("customerQueue"); 
     String payload = "Important Task"; 
     Message msg = session.createTextMessage(payload); 
     MessageProducer producer = session.createProducer(queue); 
     System.out.println("Sending text '" + payload + "'"); 
     msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L); 
     producer.send(msg); 
     connection.start(); 

     // Consumer 
     MessageConsumer consumer = null; 
     consumer = session.createConsumer(queue); 

     QueueBrowser browser = session.createBrowser(queue); 
     while (browser.getEnumeration().hasMoreElements()) { 
      TextMessage textMsg = (TextMessage) consumer.receive(); 
      browser.getEnumeration().nextElement(); 
      System.out.println(textMsg); 
      System.out.println("Received: " + textMsg.getText()); 
     } 

     session.close(); 
    } finally{ 
     if (connection != null) { 
      connection.close(); 
     } 
     broker.stop(); 
    } 
} 

第一清潔運行(與空KahaDB店)不應輸出

「收稿日期:重要任務」

,而第二個會,如果你不刪除中間的數據文件。

卸下線`

msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,5000L);

將使第一清潔運行輸出「接收:重要任務」

+0

刪除'KahaDB'解決了這個問題。謝謝! – Lefteris008