閱讀郵件我試圖創建一個使用使用DefaultMessageListenerContainer收聽來自IBM MQ消息一些春季啓動代碼。使用DefaultMessageListenerContainer無法從IBM MQ
我可以創建MQQueueConnectionFactory發送和接收使用JmsTemplate的消息,但是這是爲了通過高通量,要使用偵聽器,而不是投票。
我有很多整合的代碼到一個組件,所以我希望我所有的是相關的。
如果我安排receiveMessage方法,它拿起排隊的消息,所以我知道的sendMessage方法在排隊等待消息。
@Component
class AllInOneTest {
private MessagingConfiguration.QueueConfig config;
private MQQueueConnectionFactory connectionFactory;
private JmsTemplate jmsTemplate;
private DefaultMessageListenerContainer listenerContainer;
private final Logger logger = LoggerFactory.getLogger(getClass());
public AllInOneTest(MessagingManager manager) throws JMSException {
String detailsName = "default";
config = manager.getMessagingDetails(detailsName).getConfig();
logger.debug("AllInOneTest Initializing Connection Factory: {}", detailsName);
connectionFactory = new MQQueueConnectionFactory();
connectionFactory.setHostName(config.getHost());
connectionFactory.setPort(config.getPort());
connectionFactory.setTransportType(config.getTransportType());
connectionFactory.setQueueManager(config.getQueueManager());
connectionFactory.setChannel(config.getChannel());
logger.debug("AllInOneTest Initializing Message Listener: {}", detailsName);
DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
defaultListener.setConnectionFactory(connectionFactory);
defaultListener.setExceptionListener((ee) -> {
logger.warn(String.format("AllInOneTest Message Listener Error: %s", detailsName), ee);
});
defaultListener.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created Listener Destination: {}", ret);
return ret;
});
defaultListener.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
logger.info("AllInOneTest Listening For Message: {}", message);
}
});
// TODO Configure subscription.
// defaultListener.setSubscriptionDurable(true);
// defaultListener.setSubscriptionName("masher-service");
// TODO Configure concurrency.
// defaultListener.setConcurrency(config.getConcurrency());
// TODO Configure transaction.
// defaultListener.setSessionTransacted(config.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
listenerContainer = defaultListener;
logger.debug("AllInOneTest Initializing JMS Template: {}", detailsName);
jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(new SpringToJMSMessageConverter());
jmsTemplate.setReceiveTimeout(1000L);
jmsTemplate.setDefaultDestinationName(config.getOutputQueue());
jmsTemplate.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created JMS Template Destination: {}", ret);
return ret;
});
listenerContainer.setDestinationName(config.getOutputQueue());
logger.debug("AllInOneTest Starting Message Listener: {} on {}", detailsName, config.getOutputQueue());
listenerContainer.start();
}
// @Scheduled(fixedRate = 500L)
public void receiveMessage() {
Object message = jmsTemplate.receiveAndConvert();
if (message != null) {
logger.info("AllInOneTest Received: {}", message);
}
}
@Scheduled(fixedRate = 1500L)
public void sendMessage() {
int count = counter.incrementAndGet();
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(String.format("JMS Masher Message %d %s %s", count,
new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), UUID.randomUUID().toString())).build();
logger.info("AllInOneTest Sending: {} [{}]", message.getPayload(), message.getHeaders());
jmsTemplate.convertAndSend(config.getInputQueue(), message);
}
}
我打電話DefaultMessageListenerContainer.start(),但我得到的感覺是不是「開始」,我必須失去了一些東西。
DestinationResolver的被調用JmsTemplate的,但不使用DefaultMessageListenerContainer。
我在控制檯中看不到任何異常。
感謝您的幫助, 韋斯。
啊。我在一些不相關的樣本中看到了這一點。我有一個'@ Bean'和'@ JmsListener'實現,但我可能需要根據配置需要配置一個或多個偵聽器。我使用的是Spring,因此我不需要編寫過程代碼,但需求並不總是表現正確。謝謝。 – Wes
這很好,理解;但是每當你自己實例化Spring提供的組件時,你應該總是檢查它們是否實現'InitializingBean',並且如果他們確實需要調用'afterPropertiesSet()'。 –