2014-04-20 85 views
1

我想用springmq使用spring amqp,下面是我的配置。spring amqp rabbitmq MessageListener不工作

<rabbit:connection-factory id="rabbitConnectionFactory" 
    port="${rabbitmq.port}" host="${rabbitmq.host}" /> 

<rabbit:admin connection-factory="rabbitConnectionFactory" /> 

<rabbit:queue name="${rabbitmq.import.queue}" /> 

<rabbit:template id="importAmqpTemplate" 
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" /> 

<beans:bean id="importExchangeMessageListener" 
    class="com.stockopedia.batch.foundation.ImportMessageListener" /> 

<rabbit:listener-container 
    connection-factory="rabbitConnectionFactory" concurrency="5"> 
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" /> 
</rabbit:listener-container> 

這是簡單的消息監聽器類,

import org.springframework.amqp.core.Message; 
import org.springframework.amqp.core.MessageListener; 

public class ImportMessageListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     System.out.println("consumer output: " + message); 
    } 

} 

這是製片人(這是春天一批itemWriter),

public class ImportItemWriter<T> implements ItemWriter<T> { 

    private AmqpTemplate template; 

    public AmqpTemplate getTemplate() { 
     return template; 
    } 

    public void setTemplate(AmqpTemplate template) { 
     this.template = template; 
    } 

    public void write(List<? extends T> items) throws Exception { 
     for (T item : items) { 
      Object reply = template.convertSendAndReceive(item.toString()); 
      System.out.println("producer output: " + reply); 
     } 
    } 

} 

當我跑我的春天批處理作業,ImportItemWriter。寫入被調用。但ImportMessageListener.onMessage不起作用。它不打印消息。我得到下面的所有項目輸出在控制檯上

producer output: null 
producer output: null 
producer output: null 
producer output: null 
producer output: null 
producer output: null 
producer output: null 

回答

4

你的消費不發送的結果...

@Override 
public void onMessage(Message message) { 
    System.out.println("consumer output: " + message); 
} 

將其更改爲一個簡單的POJO;容器的MessageListenerAdapter將負責您的轉換,併發送結果。

@Override 
public String handleMessage(String message) { 
    System.out.println("consumer output: " + message); 
    return "result"; 
} 

編輯:

您還沒有建立到隊列中的任何交換或路由。如果您想使用默認的交換/路由,使用...

convertSendAndReceive("", queueName, item.toString()); 

EDIT2:

或者,將routingKey在模板上的隊列名稱,然後你可以用更簡單的方法。

...sendAndReceive()方法用於請求/回覆場景,因此需要阻止。要異步執行此操作,您必須使用...send()方法之一,並將自己的SimpleListenerContainer連接起來以接收答覆;你將不得不做你自己的關聯。使用

public void convertAndSend(Object message, MessagePostProcessor postProcessor) 

,並在您的郵件後處理器,設置replyTocorrelationId頭...

message.getMessageProperties().setReplyTo("foo"); 
message.getMessageProperties().setCorrelationId("bar"); 

或者建立自己的Message對象(例如by using the MessageBuilder),並使用send方法...

template.send(MessageBuilder.withBody("foo".getBytes()) 
      .setReplyTo("bar") 
      .setCorrelationId("baz".getBytes()) 
      .build()); 

每個請求都需要一個唯一的correlationId這樣你就可以響應相關。

+0

編輯了另一個我注意到的錯誤。 –

+0

謝謝,我不想從每個convertSendAndReceive調用傳遞queueName和交換。我怎樣才能配置它在模板配置?在文檔中,配置如上所述。我想使用默認交換。 – vishal

+0

感謝添加只是路由鍵相同的隊列名稱工作! – vishal