2010-12-14 216 views
23

是否可以通過RabbitMQ發送消息並延遲一段時間? 例如,我想在30分鐘後過期客戶端會話,併發送30分鐘後將處理的消息。RabbitMQ中的延遲消息

+0

你需要使用的RabbitMQ? – 2013-02-13 21:49:29

+1

是的,自RabbitMQ-3.5.8開始提供此功能。 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar 2017-06-03 02:11:43

+0

如果你使用Spring AMQP,那麼[支持插件](https://docs.spring.io/spring-amqp/reference/htmlsingle /#延遲消息交換)。 – Gruber 2017-09-07 13:19:30

回答

5

這是目前不可能的。您必須將過期時間戳存儲在數據庫中或類似的東西上,然後有一個幫助程序讀取這些時間戳並排隊消息。

延遲消息是經常被要求的功能,因爲它們在很多情況下都很有用。但是,如果您的需要是過期的客戶端會話,我認爲消息傳遞不是您的理想解決方案,而另一種方法可能會更好。

9

隨着RabbitMQ的V2.8的發佈,預定交付現已但是作爲一個間接的功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

+0

我試過這種方法,但遇到了一些問題,建議任何一個? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 – 2013-01-15 13:46:16

+5

我做了一個秒殺,打了幾個showstoppers: 1.消息只有DLQ:en在Q的頂部(http://www.rabbitmq.com/ttl.html - 注意部分) 這意味着如果我第一次設置消息1將在4小時後過期並且msg2在1小時後過期msg2只會在msg1過期後過期。 2.消息的TTL由Rabbit保存,所以可以說你使用10秒的短暫超時。如果消費者在消息過期10秒後仍然無法使用消息(由於積壓),消息將被丟棄並丟失 以上內容已通過兔子3.0.1驗證 您是否看到任何解決方法? – 2013-01-16 14:51:42

6

由於我沒有足夠的聲譽添加評論,發佈新的答案。這只是在http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

已經討論過的內容的補充,除了在消息上設置ttl之外,您可以將其設置在隊列級別。此外,您可以避免創建新的交換,只是爲了將消息重定向到不同的隊列。下面是示例Java代碼:

監製:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.util.HashMap; 
import java.util.Map; 

public class DelayedProducer { 
    private final static String QUEUE_NAME = "ParkingQueue"; 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 
     Channel channel = connection.createChannel(); 

     Map<String, Object> arguments = new HashMap<String, Object>(); 
     arguments.put("x-message-ttl", 10000); 
     arguments.put("x-dead-letter-exchange", ""); 
     arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME); 
     channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 

     for (int i=0; i<5; i++) { 
      String message = "This is a sample message " + i; 
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
      System.out.println("message "+i+" got published to the queue!"); 
      Thread.sleep(3000); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

消費者:

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Consumer { 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     boolean autoAck = false; 
     channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      System.out.println(" [x] Received '" + message + "'"); 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
     } 

    } 
} 
+0

非常感謝。我認爲你在消費者隊列中有一個小錯誤declare channel.queueDeclare(QUEUE_NAME,false,false,false,null);它應該有「DESTINATION_QUEUE_NAME」而不是「QUEUE_NAME」。非常非常感謝你 – 2016-09-19 21:45:42

5

它看起來像this blog post描述了使用死信交換和消息的TTL做類似的事情。

下面的代碼使用CoffeeScript和Node.JS來訪問Rabbit並實現類似的東西。

amqp = require 'amqp' 
events = require 'events' 
em  = new events.EventEmitter() 
conn = amqp.createConnection() 

key = "send.later.#{new Date().getTime()}" 
conn.on 'ready', -> 
    conn.queue key, { 
    arguments:{ 
     "x-dead-letter-exchange":"immediate" 
    , "x-message-ttl": 5000 
    , "x-expires": 6000 
    } 
    }, -> 
    conn.publish key, {v:1}, {contentType:'application/json'} 

    conn.exchange 'immediate' 

    conn.queue 'right.now.queue', { 
     autoDelete: false 
    , durable: true 
    }, (q) -> 
    q.bind('immediate', 'right.now.queue') 
    q.subscribe (msg, headers, deliveryInfo) -> 
     console.log msg 
     console.log headers 
0

假設你有控制權的消費者,可以實現對消費者這樣??延遲:

如果我們相信,在隊列中的第n個消息總是具有較小的延遲比第n + 1條消息多(這在許多用例中都可以):生產者在任務中發送timeInformation來傳達需要執行該任務的時間(currentTime + delay)。消費者:

1)讀取任務

2)scheduledTime如果currentTime的> scheduledTime繼續。

否則延遲= scheduledTime - currentTime的

睡眠時間由延遲

消費者指示總是與併發參數配置。所以,其他消息將在隊列中等待,直到消費者完成工作。所以,這個解決方案可以工作得很好,雖然看起來很尷尬,特別是對於大的時間延遲。

5

感謝諾曼的回答,我可以在NodeJS中實現它。

代碼中的一切都很清楚。 希望它能節省一些人的時間。

var ch = channel; 
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); 
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); 

// setup intermediate queue which will never be listened. 
// all messages are TTLed so when they are "dead", they come to another exchange 
ch.assertQueue("my_intermediate_queue", { 
     deadLetterExchange: "my_final_delayed_exchange", 
     messageTtl: 5000, // 5sec 
}, function (err, q) { 
     ch.bindQueue(q.queue, "my_intermediate_exchange", ''); 
}); 

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { 
     ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); 

     ch.consume(q.queue, function (msg) { 
      console.log("delayed - [x] %s", msg.content.toString()); 
     }, {noAck: true}); 
}); 
4

有兩種方法可以嘗試:

老方法:設置TTL(生存時間)報頭中的每個消息/隊列(策略),然後引入DLQ處理它。一旦ttl過期,您的消息將從DLQ移動到主隊列,以便您的聽衆可以處理它。

最新方法:最近的RabbitMQ想出了RabbitMQ的延遲郵件插件,利用它可以實現相同的,因爲RabbitMQ的-3.5.8可用此插件的支持。

您可以使用類型x-delayed-message聲明交換,然後使用自定義標題x-delay以毫秒爲單位發佈消息,以表示消息的延遲時間。該消息將在X-delay毫秒

這裏更多地傳遞到相應的隊列:git