0
對於請求/響應,我需要一個臨時隊列來應答。我想創建一個隊列並始終保持打開狀態(而不是使用SessionCallback.doInJms()
爲每個請求創建一個新的隊列)。如何在Spring中創建臨時JMS隊列?
我如何用Spring的JMS支持來做到這一點?
對於請求/響應,我需要一個臨時隊列來應答。我想創建一個隊列並始終保持打開狀態(而不是使用SessionCallback.doInJms()
爲每個請求創建一個新的隊列)。如何在Spring中創建臨時JMS隊列?
我如何用Spring的JMS支持來做到這一點?
我找不到辦法做到這一點,所以我創建了一個解決方法。這個類將保持會話和連接打開,直到上下文被銷燬。這樣,你可以確定你會得到每一個答覆。其他代碼通常會發送消息,打開回復隊列,然後有時會看不到回覆,因爲它是在發件人打開回復隊列之前發送的。
用法:
@Bean
public JmsTemplate replyJmsTemplate() {
JmsTemplate result = new JmsTemplate(jmsConnectionFactory());
result.setDefaultDestination(replyQueueProvider().getQueue());
result.setReceiveTimeout(10000);
return result;
}
@Bean
public QueueProvider replyQueueProvider() {
QueueProvider result = new QueueProvider(jmsConnectionFactory());
result.init(); // Must call manually; no @PostConstruct!
return result;
}
實現:
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.UncategorizedJmsException;
import org.springframework.jms.support.JmsUtils;
public class QueueProvider {
private static final Logger log = LoggerFactory.getLogger(QueueProvider.class);
private static final AtomicInteger COUNT = new AtomicInteger();
private final ConnectionFactory connectionFactory;
private String queueName;
private boolean isTemporary;
private Connection connection;
private Session session;
private Queue queue;
private boolean transacted;
private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
public QueueProvider(ConnectionFactory connectionFactory, String queueName) {
this.connectionFactory = connectionFactory;
this.queueName = queueName;
}
public QueueProvider(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
this.isTemporary = true;
this.queueName = "TemporaryQueue-" + COUNT.incrementAndGet();
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public boolean getTransacted() {
return transacted;
}
public void setAcknowledgeMode(int acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}
public int getAcknowledgeMode() {
return acknowledgeMode;
}
public void init() {
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, acknowledgeMode);
log.debug("Opening queue {}", queueName);
if (isTemporary) {
queue = session.createTemporaryQueue();
} else {
queue = session.createQueue(queueName);
}
} catch(Exception e) {
throw new UncategorizedJmsException("Error creating queue " + queueName, e);
}
}
@PreDestroy
public void close() {
log.debug("Closing queue {}", queueName);
queue = null;
JmsUtils.closeSession(session);
JmsUtils.closeConnection(connection);
}
public Queue getQueue() {
if(null == queue) {
throw new IllegalStateException("Either init() wasn't called or close() was already called");
}
return queue;
}
}