我在我的消費者內部拋出一個AmqpException。 我的期望是消息將按照先進先出的順序返回到隊列,並且將來會在某個時間進行重新處理。rabbitmq與春天amqp - 消息卡住的情況下AmqpException
Spring AMQP似乎並未將消息釋放回隊列。而是試圖一遍又一遍地重新處理失敗的消息。 這會阻止正在處理的新到消息。卡住的那些永久出現在AMQP控制檯內的「未打包」狀態。
有什麼想法?
我在我的消費者內部拋出一個AmqpException。 我的期望是消息將按照先進先出的順序返回到隊列,並且將來會在某個時間進行重新處理。rabbitmq與春天amqp - 消息卡住的情況下AmqpException
Spring AMQP似乎並未將消息釋放回隊列。而是試圖一遍又一遍地重新處理失敗的消息。 這會阻止正在處理的新到消息。卡住的那些永久出現在AMQP控制檯內的「未打包」狀態。
有什麼想法?
這就是rabbitmq/Spring AMQP的工作方式;如果消息被拒絕(引發任何異常),則默認情況下會重新發送消息,並將其放回到隊列的頭部,以便立即重試。
...在未來某個時間進行重新處理。
您必須適當地配置事情才能實現此目的。
首先,您必須告訴經紀人不要重新發送消息。這是通過將偵聽器容器上的defaultRequeueRejected
設置爲false(默認情況下爲true)完成的。或者,您可以拋出指示容器拒絕(而不是重新發送)單個消息的AmqpRejectAndDontRequeueException
。
但這並不是結束;只是這樣做只會導致被拒絕的信息被丟棄。
爲避免這種情況,您必須爲隊列設置Dead Letter Exchange/Queue - 拒絕的消息然後發送到DLX/DLQ而不是被丟棄。通常建議使用策略而不是隊列參數。
最後,您可以在DLQ上設置一條消息生存時間,以便在此之後消息從隊列中移除。如果您在上設置了另一個適當的無效信函交換隊列(DLQ),則可以使該消息在時間到期後重新排隊回原始隊列。
請注意,這隻適用於從原始隊列中拒收的貨物;在該隊列中的消息過期時它將不起作用。
請參閱this answer及其問題的一些鏈接瞭解更多詳情。
您可以使用x-death
標頭的內容來決定是否應該在嘗試一些次數後完全放棄(捕獲異常並以某種方式處置壞消息;不要拋出異常並且容器將響應信息)。
這是我用來解決這個問題的解決方案。我設置了一個攔截器,在應用退避策略的同時重試消息x次。 http://trippstech.blogspot.com/2016/03/rabbitmq-deadletter-queue-with.html
Gary,謝謝您的反饋。 – Vladimir 2015-02-06 04:26:29
我希望能夠使用Spring AMQP RetryOperationsInterceptor,它會在拋出AmqpException時啓動。一旦達到最大重試次數,「Recoverer」將拋出一個AmqpRejectAndDontRequeueException,這將拋棄該消息。我想到了重試攔截器的重試嘗試(可能需要幾個小時的指數退避),消息會返回隊列,從而允許處理新到的消息。 – Vladimir 2015-02-06 04:32:52
否;重試攔截器在將其傳遞給偵聽器之前僅阻塞該線程;它只適用於重試之間的短暫延遲;你需要像我描述的那樣使用經紀人來推遲重新投資。消費者沒有其他辦法可以告訴經紀人在重新投遞之前等待一段時間;它只是不是amqp協議的一部分。即使使用JMS,這種延遲也是代理商專有的,不屬於jms API。 – 2015-02-06 04:41:33