2013-06-12 32 views
9

RabbitMQ的新手和Java新手。如何在使用Spring ChannelAwareMessageListener時處理RabbitMQ消費者取消通知

我試圖寫,將使用手動ACK和處理使用java春AMQP抽象消費者取消通知的監聽器。我可以通過使用Spring抽象來完成這兩項任務嗎?

我想寫一個偵聽器,它將從隊列中提取消息並處理該消息(可能寫入數據庫或其他東西)。我計劃使用手動確認,這樣如果處理消息失敗或由於某種原因無法完成,我可以拒絕並重新發送。到目前爲止,我想我已經發現,爲了使用Spring AMQP手動確認/確認/拒絕,我必須使用ChannelAwareMessageListener

我意識到我應該從RabbitMQ處理消費者取消通知,但是使用ChannelAwareMessageListener我真的沒有看到爲此編寫代碼的方法。我認爲處理CCN的唯一方法是使用較低級別的java客戶端API編寫代碼,方法是呼叫channel.basicConsume()並傳遞一個新的DefaultConsumer實例,該實例允許您處理消息傳遞和取消。

我也看不到我如何在ConnectionFactory上設置clientProperties(告訴代理我可以處理CCN),因爲我從配置中的bean獲取工廠。

我的偵聽器的僞代碼和容器的創建如下。

public class MyChannelAwareListener implements ChannelAwareMessageListener 
{ 
    @Override 
    public void onMessage(Message message, Channel channel) throws Exception 
    { 
     msgProcessed = processMessage(message); 

     if(msgProcessed)  
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
     else 
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); 
    } 
} 

public static void main(String[] args) throws Exception 
{ 
    ConnectionFactory rabbitConnectionFactory; 
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH); 
    rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory"); 

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 

    MyChannelAwareListener listener = new MyChannelAwareListener(); 
    container.setMessageListener(listener); 
    container.setQueueNames("myQueue"); 
    container.setConnectionFactory(rabbitConnectionFactory); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
    container.start(); 
} 

回答

1

爲了設置您需要使用setClientProperties方法在ConnectionFactory客戶端屬性(假設該連接工廠是從RabbitMQ的Java庫的對象)。此方法期望Map<String, Object>包含客戶端的屬性和功能。下面的線是RabbitMQ的Java庫中的默認值:

Map<String,Object> props = new HashMap<String, Object>(); 
props.put("product", LongStringHelper.asLongString("RabbitMQ")); 
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION)); 
props.put("platform", LongStringHelper.asLongString("Java")); 
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT)); 
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE)); 

Map<String, Object> capabilities = new HashMap<String, Object>(); 
capabilities.put("publisher_confirms", true); 
capabilities.put("exchange_exchange_bindings", true); 
capabilities.put("basic.nack", true); 
capabilities.put("consumer_cancel_notify", true); 

props.put("capabilities", capabilities); 

爲了管理ACK和消費者取消我不知道如何與Spring AMQP抽象做到這一點,但它是完全可行與channel.basicConsume它給你有可能通過所有的回調方法處理所有的場景:

http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/

希望這有助於!