2015-06-21 52 views
2

我正在研究一個涉及2個隊列和多個Listeners與它們進行交互的項目。 流量:有2個隊列的彈簧AMQP項目

  • 新的HTTP請求到服務器的,那麼它轉換成一個對象,這將是該消息
  • 此消息在兩個隊列
  • 出版我有兩個類型的監聽器是從每個隊列獲得消息,然後我做我想做的任何事

我一直在閱讀,最好的方法是使用扇出交換。這裏是我的代碼:

監聽-configuration.xml文件

<!-- CREATE CONNECTION FACTORY --> 
<rabbit:connection-factory id="connectionFactory" 
    host="localhost" username="guest" password="guest" /> 

<rabbit:admin connection-factory="connectionFactory" /> 

<!-- <!-- RABBIT QUEUE'S --> 
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false 
    durable="true" /> 
<!-- Webapp Queue --> 
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false 
    durable="true" /> 

<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE --> 
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange"> 
    <rabbit:bindings> 
     <rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding> 
     <rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding> 
    </rabbit:bindings> 
</rabbit:fanout-exchange> 

<!-- CREATE THE RABBIT TEMPLATES --> 
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/> 
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/> 

<!-- INSTANTIATE THE LISTENERS --> 
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" /> 
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" /> 

<!-- CREATE THE JsonMessageConverter BEAN --> 
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" /> 

<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER --> 
<rabbit:listener-container id="listenerContainer" 
    connection-factory="connectionFactory" message-converter="jsonMessageConverter"> 
    <rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" /> 
    <rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" /> 
</rabbit:listener-container> 

寄件人configuration.xml中

<!-- First following line creates a rabbit connection factory with specified parameters --> 
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" /> 

<!-- Obtain admin rights to create an exchange --> 
<rabbit:admin connection-factory="connectionFactory" /> 

<!-- Create a bean which can send message to trashroute-exchange for the Java program to call --> 
<rabbit:template id="template" connection-factory="connectionFactory" exchange="myExchange" 
message-converter="jsonMessageConverter" /> 


<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> 
<property name="connectionFactory" ref="rabbitConnectionFactory"/> 
<property name="messageConverter"> 
    <bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/> 
</property> 

監聽MainConfiguration.java

@Configuration 
public class MainConfiguration { 

protected final String persistenceQueue = "trashroute.rabbit.queue"; 
protected final String webappQueue = "trashroute2.rabbit.queue"; 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    return connectionFactory; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public DataController DataController(){ 
    return new DataController(); 
} 

@Bean 
// Every queue is bound to the default direct exchange 
public Queue persistenceQueue() { 
    //Create a new queue with an specific name and the durability value in true. 
    return new Queue(this.persistenceQueue, true); 
} 

@Bean 
public Queue webappQueue() { 
    //Create a new queue with an specific name and the durability value in true. 
    return new Queue(this.webappQueue, true); 
} 
} 

發件人MainConfiguration.java

@Configuration 
public class SenderConfiguration { 

protected final String persistenceQueue = "trashroute.rabbit.queue"; 
protected final String webappQueue = "trashroute2.rabbit.queue"; 

//Create the Template 
@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(new JsonMessageConverter()); 
    return template; 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
      "localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    return connectionFactory; 
} 

@Bean 
public IServiceManager scheduledProducer() { 
    return new ServiceManagerImpl(); 
} 

@Bean 
public BeanPostProcessor postProcessor() { 
    return new ScheduledAnnotationBeanPostProcessor(); 
} 

} 

任何人都可以告訴我我做錯了什麼?兩位聽衆中的一位完美地工作,第二位從不讀信息。

+0

您是否同時使用JavaConfig和XML配置?他們看起來多餘。 –

+0

是的,因爲如果配置中有一些點在xml中不起作用 – ecastellano

+0

可能是由於其他一些原因。你不應該混合使用XML和Java Config。使用其中任何一個。 –

回答

10

基於上面解釋的場景,我嘗試創建一個使用Spring Java Config的示例應用程序。

消息發佈到trashroutewebapp隊列,並且相應的接收器(persistencewebapp)接收消息。

RabbitConfiguration.java(包含配置爲發送者和接收者)

@Configuration 
@EnableRabbit 
public class RabbitConfiguration { 

    public static final String BROADCAST_TRASHROUTE_QUEUE = "trashroute.rabbit.queue"; 
    public static final String BROADCAST_WEBAPP_QUEUE = "webapp.rabbit.queue"; 

    @Bean 
    public ConnectionFactory connectionFactory() { 
     CachingConnectionFactory connectionFactory = 
       new CachingConnectionFactory("localhost"); 
     return connectionFactory; 
    } 


    @Bean 
    public AmqpAdmin amqpAdmin() { 
     return new RabbitAdmin(connectionFactory()); 
    } 

    @Bean 
    public Queue trashRouteQueue() { 
     return new Queue(BROADCAST_TRASHROUTE_QUEUE); 
    } 

    @Bean 
    public Queue webAppQueue() { 
     return new Queue(BROADCAST_WEBAPP_QUEUE); 
    } 

    @Bean 
    public RabbitTemplate rabbitTemplate() { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); 
     return rabbitTemplate; 
    } 

    @Bean 
    public FanoutExchange trashRouteExchange() { 
     FanoutExchange exchange = new FanoutExchange("trashroute"); 
     return exchange; 
    } 

    @Bean 
    public Binding trashRouteBinding() { 
     return BindingBuilder.bind(trashRouteQueue()).to(trashRouteExchange()); 
    } 

    @Bean 
    public Binding webAppBinding() { 
     return BindingBuilder.bind(webAppQueue()).to(trashRouteExchange()); 
    } 

    @Bean 
    SimpleMessageListenerContainer persistenceListenerContainer(ConnectionFactory connectionFactory, @Qualifier("persistenceListenerAdapter") MessageListenerAdapter listenerAdapter) { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
     container.setConnectionFactory(connectionFactory); 
     container.setQueues(trashRouteQueue(), webAppQueue()); 
     container.setMessageListener(listenerAdapter); 
     return container; 
    } 

    @Bean 
    MessageListenerAdapter persistenceListenerAdapter(PersistenceListener receiver) { 
     return new MessageListenerAdapter(receiver, "receiveMessage"); 
    } 

    @Bean 
    SimpleMessageListenerContainer webAppListenerContainer(ConnectionFactory connectionFactory, @Qualifier("webAppListenerAdapter") MessageListenerAdapter listenerAdapter) { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
     container.setConnectionFactory(connectionFactory); 
     container.setQueues(trashRouteQueue(), webAppQueue()); 
     container.setMessageListener(listenerAdapter); 
     return container; 
    } 

    @Bean 
    MessageListenerAdapter webAppListenerAdapter(WebAppListener webAppListener) { 
     return new MessageListenerAdapter(webAppListener, "receiveMessage"); 
    } 

    @Bean 
    PersistenceListener persistenceListener() { 
     return new PersistenceListener(); 
    } 

    @Bean 
    WebAppListener webAppListener() { 
     return new WebAppListener(); 
    } 

} 

PersistenceListener.java

public class PersistenceListener { 

    public void receiveMessage(String message) { 
     System.out.println("Persistence Listener: Messsage Received <" + message + ">"); 
    } 
} 

WebAppListener.java

public class WebAppListener { 
    public void receiveMessage(String message) { 
     System.out.println("WebAppListener: Message Received <" + message + ">"); 
    } 
} 

Application.java

@SpringBootApplication 
public class Application implements CommandLineRunner { 

    @Autowired 
    AnnotationConfigApplicationContext context; 

    public static void main(String[] args) throws InterruptedException { 
     SpringApplication.run(Application.class, args); 
    } 

    @Override 
    public void run(String... args) throws Exception { 
     System.out.println("Waiting five seconds..."); 
     Thread.sleep(5000); 
     System.out.println("Sending message..."); 

     RabbitTemplate rabbitTemplate = (RabbitTemplate) context.getBean("rabbitTemplate"); 

     rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_TRASHROUTE_QUEUE, "Hello from trashroute queue!"); 
     rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_WEBAPP_QUEUE, "Hello from webapp queue!"); 

     Thread.sleep(10000); 
     context.close(); 
    } 
} 

希望這會有所幫助。儘管如果你想在生產中使用它,你需要重構代碼。