2016-05-17 48 views
2

我是新來的spring amqp,我想創建兩個不同行爲的不同監聽器。問題是我在編譯時不知道隊列名稱,所以我不能使用this解決方案。java spring boot amqp具有不同行爲的兩個監聽器

我想要做的事情是:從「sidechannel」隊列中讀取(然後刪除)第一條消息,它應該看起來像這樣{"queues":["queue1","queue2"]}

現在從隊列1和隊列2讀取(然後刪除)第一條消息。在此之後,請轉到步驟1,閱讀側通道的第一條消息

我嘗試使用不同的偵聽器創建2個SimpleMessageListenerContainers,如下面的代碼所示,但它不起作用,因爲我認爲它會起作用。

我的代碼:

@SpringBootApplication 
public class Main implements CommandLineRunner { 

final static String queueName = "sidechannel"; 

@Autowired 
AnnotationConfigApplicationContext context; 

@Autowired 
RabbitTemplate rabbitTemplate; 

@Bean 
Queue queue() { 
    return new Queue(queueName, false); 
} 

@Bean 
TopicExchange exchange() { 
    return new TopicExchange("spring-boot-exchange"); 
} 

@Bean 
Binding binding(Queue queue, TopicExchange exchange) { 
    return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
public ConnectionFactory rabbitConnectionFactory() { 
    CachingConnectionFactory connectionFactory = 
      new CachingConnectionFactory("localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    return connectionFactory; 
} 

@Bean 
public SimpleMessageListenerContainer messageListenerContainer() { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(rabbitConnectionFactory()); 
    container.setQueueNames(queueName); 
    container.setMessageListener(sidechannelListener()); 
    return container; 
} 

@Bean 
public SimpleMessageListenerContainer messageListenerContainer2() { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(rabbitConnectionFactory()); 
    container.setQueueNames("queue1","queue2"); 
    container.setMessageListener(queueListener()); 
    return container; 
} 



@Bean 
public MessageListener sidechannelListener() { 
    return message -> { 
     String msg = new String(message.getBody()); 
     System.out.println(msg); 
     try { 
      Map<String, Object> map = jsonToMap(msg); 
      for (String name : (ArrayList<String>) map.get("queues")) { 
       System.out.println("Waiting for " + name + " message"); 
       rabbitTemplate.receive(name); 
      } 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

    }; 
} 

@Bean 
public MessageListener queueListener() { 
    return message -> { 
     String msg = new String(message.getBody()); 
     System.out.println("Received message: "); 
     System.out.println(msg); 
    }; 
} 


public static void main(String[] args) throws InterruptedException { 
    SpringApplication.run(Main.class, args); 
} 

@Override 
public void run(String... args) throws Exception { 
    rabbitTemplate.setReceiveTimeout(-1); 

    while(true) { 
     System.out.println("Waiting for side channel message"); 
     rabbitTemplate.receive(queueName); 
    } 
//  context.close(); 
} 
} 

首先,由於某種原因,在側通道隊列中的消息將不被處理後除去。 其次,當我期待像這樣的輸出:

Waiting for side channel message 
{"queues":["queue1","queue2"]} 
Waiting for queue1 message 
Received message: 
"message from queue1" 
Waiting for queue2 message 
"message from queue2" 
Waiting for side channel message 

那就算我對這些不同的隊列中接收到的消息,什麼都不會發生(因爲rabbitTemplate.setReceiveTimeout(-1);),但不知何故,它反應的是我收到的每封郵件...

另外,我不明白的是,如果我第一次將消息發送到側通道,然後QUEUE1它是這樣:

Waiting for side channel message 
{"queues":["queue1","queue2"]} 
Waiting for queue1 message 
Received message: 
"message from queue1" 

,現在,如果我再派(一秒)消息到queue1,它打印出th電子郵件,然後Waiting for queue2 message

所以它需要兩個信息來繼續週期...我不知道我在做什麼錯。

+0

對我來說看起來過於複雜,爲什麼不使用配置?在啓動時指定隊列名稱(如果可以將它們放在交換機上,也可以將它們放入屬性文件中)。 –

+0

這可能是非常複雜的,正如我所說的,我是春季amqp的新手,我仍然無法理解它。但由於我不知道編譯時的隊列名稱,我不能將它們添加到屬性文件,或者我錯了嗎?無論如何,Gary的回答幫助我解決了這個問題,感謝您的回覆:) –

+0

您不需要在編譯時瞭解它們。我建議閱讀參考指南。你可以在啓動時提供屬性,屬性可以在以後定義,你只需要一個固定的屬性名稱。 –

回答

0

你似乎在混合範例;您有偵聽器容器,它們是由消息驅動的,並且您還在使用輪詢(template.receive())。一般地,容器隊列1,隊列2將已經處理了來自這些隊列中的消息和這個

  System.out.println("Waiting for " + name + " message"); 
      rabbitTemplate.receive(name); 

將永遠阻止如果超時是< 0;因此原始消息將永遠不會被刪除。

+0

你是對的,我不知道聽者容器是消息驅動的,不知怎的解釋它,我可以解決它,非常感謝你! –