我是新來的spring amqp,我想創建兩個不同行爲的不同監聽器。問題是我在編譯時不知道隊列名稱,所以我不能使用this解決方案。java spring boot amqp具有不同行爲的兩個監聽器
我想要做的事情是:從「sidechannel」隊列中讀取(然後刪除)第一條消息,它應該看起來像這樣{"queues":["queue1","queue2"]}
。
現在從隊列1和隊列2讀取(然後刪除)第一條消息。在此之後,請轉到步驟1,閱讀側通道的第一條消息
我嘗試使用不同的偵聽器創建2個SimpleMessageListenerContainers,如下面的代碼所示,但它不起作用,因爲我認爲它會起作用。
我的代碼:
@SpringBootApplication
public class Main implements CommandLineRunner {
final static String queueName = "sidechannel";
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queueName);
container.setMessageListener(sidechannelListener());
return container;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames("queue1","queue2");
container.setMessageListener(queueListener());
return container;
}
@Bean
public MessageListener sidechannelListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println(msg);
try {
Map<String, Object> map = jsonToMap(msg);
for (String name : (ArrayList<String>) map.get("queues")) {
System.out.println("Waiting for " + name + " message");
rabbitTemplate.receive(name);
}
} catch (IOException e) {
e.printStackTrace();
}
};
}
@Bean
public MessageListener queueListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println("Received message: ");
System.out.println(msg);
};
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Main.class, args);
}
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setReceiveTimeout(-1);
while(true) {
System.out.println("Waiting for side channel message");
rabbitTemplate.receive(queueName);
}
// context.close();
}
}
首先,由於某種原因,在側通道隊列中的消息將不被處理後除去。 其次,當我期待像這樣的輸出:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
Waiting for queue2 message
"message from queue2"
Waiting for side channel message
那就算我對這些不同的隊列中接收到的消息,什麼都不會發生(因爲rabbitTemplate.setReceiveTimeout(-1);
),但不知何故,它反應的是我收到的每封郵件...
另外,我不明白的是,如果我第一次將消息發送到側通道,然後QUEUE1它是這樣:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
,現在,如果我再派(一秒)消息到queue1,它打印出th電子郵件,然後Waiting for queue2 message
。
所以它需要兩個信息來繼續週期...我不知道我在做什麼錯。
對我來說看起來過於複雜,爲什麼不使用配置?在啓動時指定隊列名稱(如果可以將它們放在交換機上,也可以將它們放入屬性文件中)。 –
這可能是非常複雜的,正如我所說的,我是春季amqp的新手,我仍然無法理解它。但由於我不知道編譯時的隊列名稱,我不能將它們添加到屬性文件,或者我錯了嗎?無論如何,Gary的回答幫助我解決了這個問題,感謝您的回覆:) –
您不需要在編譯時瞭解它們。我建議閱讀參考指南。你可以在啓動時提供屬性,屬性可以在以後定義,你只需要一個固定的屬性名稱。 –