2013-10-07 96 views
0

我有一個使用Spring和RabbitMQ的項目設置。目前,我的應用程序可能會收到一個amqp消息,直到另一個異步過程完成(遺留的和完全分離的,我無法控制)才能被處理。所以結果是我可能不得不等待處理消息一段時間。這是變壓器的一個例外。隊列尾部重新排隊Amqp消息

當消息被NACK返回到rabbitMQ時,它將把它放回隊列頭並立即重新拉動它。如果我得到的不可處理的消息數量等於併發偵聽器的數量,我的工作流將鎖定。即使在隊列中存在有效的可處理消息,它也會等待消息的輪子變成可處理的。

有沒有辦法拒絕和amqp消息,並讓它回到隊列的尾部呢?從我的研究中,rabbitMQ一次以這種方式工作,但現在我似乎獨佔了隊列頭。

我的配置是相當簡單的,但是對於連續性這裏是......

連接工廠:org.springframework.amqp.rabbit.connection.CachingConnectionFactory 的RabbitMQ 3.1.1

Spring集成:2.2.0

<si:channel id="channel"/> 
<si-amqp:inbound-channel-adapter 
    queue-names="commit" channel="channel" connection-factory="amqpConnectionFactory" 
    acknowledge-mode="AUTO" concurrent-consumers="${listeners}" 
    channel-transacted="true" 
    transaction-manager="transactionManager"/> 

<si:chain input-channel="channel" output-channel="nullChannel"> 
    <si:transformer ref="transformer"></si:transformer> 
    <si:service-activator ref="activator"/> 
</si:chain> 

回答

0

你是對的,RabbitMQ在前一段時間被改變了。 API中沒有任何內容可以改變行爲。

可以,當然,放入適配器上的error-channel,隨後通過變壓器(expression="payload.failedMessage"),隨後通過用適當的交換/路由密鑰配置出站適配器的隊列的後面,以重新排隊的消息。

您可能想要在錯誤流中添加一些額外的邏輯來檢查異常類型(payload.cause)並決定您想要的操作。

如果錯誤流本身引發異常,原始消息將在頭部重新排序,如前所述;如果它正常退出,該消息將被查詢。