背景: 我有一個標準的生產者消費者隊列,消費者緩慢,而生產者速度很快。期望是每當生產者完成所請求的消息時,它就認可該消息,並且生產者將假定與消息相關的任務完成。由於生產者速度很快,我不希望生產線等待,相反,只要消息被確認,就應該調用回調。由於JMS在這方面受到限制,並且我儘可能直接使用了像ActiveMQMessageProducer
這樣的ActiveMQ類。ActiveMQ實現異步確認JAVA 8
問題: 消息正在自動確認,即使Consumer尚未啓動,註冊的異步回調也會被調用。 public void send(Destination destination, Message message, AsyncCallback onComplete)
生產者
public static boolean setup() {
Producer.connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// Create a Connection
Producer.connection =
(ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
connection.start();
}
public Producer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
producer = (ActiveMQMessageProducer)session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
...
public void run() {
long id = messageID.getAndIncrement();
String text = "Hello world!"
Message message = session.createTextMessage(text);
producer.send(message, new MessageCompletion(id, this.messageRundown));
}
消費者
public static boolean setup() {
Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
return true;
}
public Consumer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
consumer.setMessageListener(this);
connection.start();
}
// implements MessageListener
@Override
public void onMessage(Message message) {
messageQueue.add(message);
}
public void run() {
while(true) {
Message message = messageQueue.poll();
while(message != null) {
// do some work
message.acknowledge();
message = messageQueue.poll();
}
Thread.sleep(10000);
}
}
雖然不需要消費者我將它作爲參考,東西已被刪除,以確保簡潔,這是一部分工作代碼。
謝謝!我現在明白了這一點,異步回調實現的JMSException處理程序現在也加起來了。因此,對於高性能,持久的消息隊列,保持異步以確保我們不會在這方面花費太多時間仍然是有意義的。 – amritanshu
對於持久性消息,您無需權衡是否想知道每條消息實際上何時會保持持久,如果有任何性能提升,就會獲得很小的消息。 –