2016-02-23 140 views
4

我創造,我想接收發送到AMQP(兔)交換(從另一個應用程序)的消息一個簡單的Spring應用程序啓動。春季啓動AMQP接收

我得到一個錯誤,指出隊列我對收到不存在。當我看着http://localhost:15672/#/queues我可以看到它的確如此;我在開始這個應用程序之前刪除它。

我在另一篇文章中讀到,如果隊列不存在,RabbitAdmin已被宣佈與declareExchange()和declareQueue()來克服這一點。 即使有這個地方,我仍然看到同樣的錯誤。此外,該誤差是混亂的B/C它指出:

所致:com.rabbitmq.client.ShutdownSignalException:信道誤差; protocol method:#method(reply-code = 404,reply-text = NOT_FOUND-no queue'from-logger-exchange'in vhost'/',class-id = 50,method-id = 10)

請注意'無隊列'表示我的交換名稱。

下面是配置代碼(我意識到,引導創建connectionFactory的& RabbitAdmin但開始加入這些時不工作):

'import javax.annotation.PostConstruct; 

import org.springframework.amqp.core.Binding; 
import org.springframework.amqp.core.BindingBuilder; 
import org.springframework.amqp.core.FanoutExchange; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.core.TopicExchange; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitAdmin; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class Config { 
    @Value("${spring.activemq.broker-url}") 
    String host; 
    @Value("${spring.activemq.user}") 
    String userName; 
    @Value("${spring.activemq.password}") 
    String password; 

    @Value("${config.exchangeName}") 
    String exchangeName; 

    @Value("${config.queueName}") 
    String queueName; 

    @PostConstruct 
    public void printVariables() { 
     System.out.println("host " + host); 
     System.out.println("userName " + userName); 
     System.out.println("password " + password); 
     System.out.println("exchangeName " + exchangeName); 
     System.out.println("queueName " + queueName); 

     System.out.println("queue.name " + queue().getName()); 
     System.out.println("exchange.name " + topicExchange().getName()); 
     System.out.println("binding " + binding().toString()); 

     System.out.println("connectionFactory " + connectionFactory().toString()); 
     System.out.println("rabbitAdmin " + rabbitAdmin().toString()); 
    } 

    @Bean 
    public ConnectionFactory connectionFactory() { 
     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host); 
     connectionFactory.setUsername(userName); 
     connectionFactory.setPassword(password); 
     return connectionFactory; 
    } 

    @Bean 
    public RabbitAdmin rabbitAdmin() { 
     RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); 
     rabbitAdmin.declareExchange(topicExchange()); 
     rabbitAdmin.declareQueue(queue()); 
     return rabbitAdmin; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue(queueName); 
    } 

// @Bean 
// public FanoutExchange fanoutExchange() { 
//  return new FanoutExchange(exchangeName); 
// } 

    public TopicExchange topicExchange() { 
     return new TopicExchange(exchangeName, false, true); 
    } 

    @Bean 
    public Binding binding() { 
     //return BindingBuilder.bind(queue()).to(fanoutExchange()); 
     return BindingBuilder.bind(queue()).to(topicExchange()).with("my.routing.key"); 
    } 

    @Bean 
    public Receiver receiver() { 
     return new Receiver(); 
    } 
} 

'

這裏的接收器:

import org.springframework.amqp.rabbit.annotation.RabbitListener; 

public class Receiver { 

    @RabbitListener(queues="${config.exchangeName}") 
    public void receive(String input) { 
     System.out.println(" Receiver#receive input: " + input); 
    } 
} 

這裏是控制檯輸出的第一位:

. ____   _   __ 
/\\/___'_ __ _ _(_)_ __ __ _ \ \ \ \ 
(()\___ | '_ | '_| | '_ \/ _` | \ \ \ \ 
\\/ ___)| |_)| | | | | || (_| | )))) 
    ' |____| .__|_| |_|_| |_\__, |//// 
=========|_|==============|___/=/_/_/_/ 
:: Spring Boot ::  (v1.3.2.RELEASE) 

2016-02-23 10:20:47.710 INFO 18804 --- [   main] c.i.logging.receiver.Application   : Starting Application on INT002520 with PID 18804 (C:\Users\matt.aspen\Documents\_logging\log-message-receiver2\target\classes started by matt.aspen in C:\Users\matt.aspen\Documents\_logging\log-message-receiver2) 
2016-02-23 10:20:47.710 INFO 18804 --- [   main] c.i.logging.receiver.Application   : No active profile set, falling back to default profiles: default 
2016-02-23 10:20:47.710 DEBUG 18804 --- [   main] o.s.boot.SpringApplication    : Loading source class com.intelligrated.logging.receiver.Application 
2016-02-23 10:20:47.750 DEBUG 18804 --- [   main] o.s.b.c.c.ConfigFileApplicationListener : Loaded config file 'classpath:/application.properties' 
2016-02-23 10:20:47.750 DEBUG 18804 --- [   main] o.s.b.c.c.ConfigFileApplicationListener : Skipped (empty) config file 'classpath:/application.properties' for profile default 
2016-02-23 10:20:47.750 INFO 18804 --- [   main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.spring[email protected]679b62af: startup date [Tue Feb 23 10:20:47 PST 2016]; root of context hierarchy 
2016-02-23 10:20:48.482 INFO 18804 --- [   main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$68858653] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 
host localhost 
userName guest 
password guest 
exchangeName from-logger-exchange 
queueName from-logger-queue 
queue.name from-logger-queue 
exchange.name from-logger-exchange 
binding Binding [destination=from-logger-queue, exchange=from-logger-exchange, routingKey=my.routing.key] 
connectionFactory CachingConnectionFactory [channelCacheSize=1, host=localhost, port=5672, active=true connectionFactory] 
2016-02-23 10:20:48.642 INFO 18804 --- [   main] o.s.a.r.c.CachingConnectionFactory  : Created new connection: [email protected] [delegate=amqp://[email protected]:5672/] 
rabbitAdmin [email protected] 
2016-02-23 10:20:48.753 DEBUG 18804 --- [   main] inMXBeanRegistrar$SpringApplicationAdmin : Application Admin MBean registered with name 'org.springframework.boot:type=Admin,name=SpringApplication' 
2016-02-23 10:20:48.998 INFO 18804 --- [   main] o.s.j.e.a.AnnotationMBeanExporter  : Registering beans for JMX exposure on startup 
2016-02-23 10:20:49.208 INFO 18804 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648 
2016-02-23 10:20:49.208 INFO 18804 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 
2016-02-23 10:20:49.218 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue:from-logger-exchange 
2016-02-23 10:20:49.228 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3 

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[from-logger-exchange] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171) [spring-rabbit-1.5.3.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_40] 
Caused by: java.io.IOException: null 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) ~[amqp-client-3.5.7.jar:na] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_40] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_40] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_40] 
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_40] 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:764) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at com.sun.proxy.$Proxy31.queueDeclarePassive(Unknown Source) ~[na:na] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    ... 3 common frames omitted 
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'from-logger-exchange' in vhost '/', class-id=50, method-id=10) 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.5.7.jar:na] 
    ... 12 common frames omitted 
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'from-logger-exchange' in vhost '/', class-id=50, method-id=10) 
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.5.7.jar:na] 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) ~[amqp-client-3.5.7.jar:na] 
    ... 1 common frames omitted 

2016-02-23 10:20:54.226 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue:from-logger-exchange 
2016-02-23 10:20:54.226 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=2 

什麼是更令人困惑的是,當我看到http://localhost:15672/#/connections我看到連接,http://localhost:15672/#/channels我看到通道,http://localhost:15672/#/exchanges我看到「從記錄器交換」,並http://localhost:15672/#/queues我看到「從記錄器隊列」

回答

0

一個問題是您的兌換不是@Bean

我得到

reply-code=404, reply-text=NOT_FOUND - no exchange 'from.logging.exchange' in vhost '/', class-id=50, method-id=20) 

與你的配置。將@Bean添加到交易所修復它。

編輯

而且(如下面阿爾喬姆指出的),你對聽衆的錯財產

@RabbitListener(queues="${config.exchangeName}") 

應該

@RabbitListener(queues="${config.queueName}") 
+0

'@RabbitListener(隊列=」 $ {config.exchangeName}「)'指向'TopicExchange'。真的不是'隊列'...聽衆只能聽真正的'隊列'。也許'$ {config.queueName}'會更好?.. –