RabbitMQ的新手和Java新手。如何在使用Spring ChannelAwareMessageListener時處理RabbitMQ消費者取消通知
我試圖寫,將使用手動ACK和處理使用java春AMQP抽象消費者取消通知的監聽器。我可以通過使用Spring抽象來完成這兩項任務嗎?
我想寫一個偵聽器,它將從隊列中提取消息並處理該消息(可能寫入數據庫或其他東西)。我計劃使用手動確認,這樣如果處理消息失敗或由於某種原因無法完成,我可以拒絕並重新發送。到目前爲止,我想我已經發現,爲了使用Spring AMQP手動確認/確認/拒絕,我必須使用ChannelAwareMessageListener
。
我意識到我應該從RabbitMQ處理消費者取消通知,但是使用ChannelAwareMessageListener
我真的沒有看到爲此編寫代碼的方法。我認爲處理CCN的唯一方法是使用較低級別的java客戶端API編寫代碼,方法是呼叫channel.basicConsume()
並傳遞一個新的DefaultConsumer
實例,該實例允許您處理消息傳遞和取消。
我也看不到我如何在ConnectionFactory
上設置clientProperties
(告訴代理我可以處理CCN),因爲我從配置中的bean獲取工廠。
我的偵聽器的僞代碼和容器的創建如下。
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}