是否可以通過RabbitMQ發送消息並延遲一段時間? 例如,我想在30分鐘後過期客戶端會話,併發送30分鐘後將處理的消息。RabbitMQ中的延遲消息
回答
這是目前不可能的。您必須將過期時間戳存儲在數據庫中或類似的東西上,然後有一個幫助程序讀取這些時間戳並排隊消息。
延遲消息是經常被要求的功能,因爲它們在很多情況下都很有用。但是,如果您的需要是過期的客戶端會話,我認爲消息傳遞不是您的理想解決方案,而另一種方法可能會更好。
隨着RabbitMQ的V2.8的發佈,預定交付現已但是作爲一個間接的功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
我試過這種方法,但遇到了一些問題,建議任何一個? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 – 2013-01-15 13:46:16
我做了一個秒殺,打了幾個showstoppers: 1.消息只有DLQ:en在Q的頂部(http://www.rabbitmq.com/ttl.html - 注意部分) 這意味着如果我第一次設置消息1將在4小時後過期並且msg2在1小時後過期msg2只會在msg1過期後過期。 2.消息的TTL由Rabbit保存,所以可以說你使用10秒的短暫超時。如果消費者在消息過期10秒後仍然無法使用消息(由於積壓),消息將被丟棄並丟失 以上內容已通過兔子3.0.1驗證 您是否看到任何解決方法? – 2013-01-16 14:51:42
由於我沒有足夠的聲譽添加評論,發佈新的答案。這只是在http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
已經討論過的內容的補充,除了在消息上設置ttl之外,您可以將其設置在隊列級別。此外,您可以避免創建新的交換,只是爲了將消息重定向到不同的隊列。下面是示例Java代碼:
監製:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class DelayedProducer {
private final static String QUEUE_NAME = "ParkingQueue";
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange", "");
arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i=0; i<5; i++) {
String message = "This is a sample message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message "+i+" got published to the queue!");
Thread.sleep(3000);
}
channel.close();
connection.close();
}
}
消費者:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
非常感謝。我認爲你在消費者隊列中有一個小錯誤declare channel.queueDeclare(QUEUE_NAME,false,false,false,null);它應該有「DESTINATION_QUEUE_NAME」而不是「QUEUE_NAME」。非常非常感謝你 – 2016-09-19 21:45:42
它看起來像this blog post描述了使用死信交換和消息的TTL做類似的事情。
下面的代碼使用CoffeeScript和Node.JS來訪問Rabbit並實現類似的東西。
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
, "x-expires": 6000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
conn.exchange 'immediate'
conn.queue 'right.now.queue', {
autoDelete: false
, durable: true
}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
假設你有控制權的消費者,可以實現對消費者這樣??延遲:
如果我們相信,在隊列中的第n個消息總是具有較小的延遲比第n + 1條消息多(這在許多用例中都可以):生產者在任務中發送timeInformation來傳達需要執行該任務的時間(currentTime + delay)。消費者:
1)讀取任務
2)scheduledTime如果currentTime的> scheduledTime繼續。
否則延遲= scheduledTime - currentTime的
睡眠時間由延遲
消費者指示總是與併發參數配置。所以,其他消息將在隊列中等待,直到消費者完成工作。所以,這個解決方案可以工作得很好,雖然看起來很尷尬,特別是對於大的時間延遲。
感謝諾曼的回答,我可以在NodeJS中實現它。
代碼中的一切都很清楚。 希望它能節省一些人的時間。
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
有兩種方法可以嘗試:
老方法:設置TTL(生存時間)報頭中的每個消息/隊列(策略),然後引入DLQ處理它。一旦ttl過期,您的消息將從DLQ移動到主隊列,以便您的聽衆可以處理它。
最新方法:最近的RabbitMQ想出了RabbitMQ的延遲郵件插件,利用它可以實現相同的,因爲RabbitMQ的-3.5.8可用此插件的支持。
您可以使用類型x-delayed-message聲明交換,然後使用自定義標題x-delay以毫秒爲單位發佈消息,以表示消息的延遲時間。該消息將在X-delay毫秒
這裏更多地傳遞到相應的隊列:git
- 1. 延遲的消息循環與RabbitMQ的
- 2. 我怎麼會使用RabbitMQ的延遲消息交換插件在RabbitMQ的發送延遲的消息?
- 3. 消息延遲
- 4. 生產者和消費者之間的RabbitMQ消息延遲
- 5. Spring AMQP中的計劃/延遲消息傳遞RabbitMq
- 6. ToolStripStatusLabel延遲消息
- 7. MDB消息消費延遲
- 8. 春AMQP的消息延遲
- 9. 給每個消息一個自定義延遲(rabbitmq)?
- 10. NServiceBus延遲消息處理
- 11. ShowBalloonTip():消息出現延遲
- 12. 兩個jQuery延遲消息?
- 13. NServicebus延遲消息傳奇?
- 14. Websocket延遲發送消息
- 15. JMS隊列中的延遲消息
- 16. 卡夫卡延遲消息消耗
- 17. 如何測試Flex消息的延遲
- 18. JUnit的延遲斷言消息創建
- 19. 延遲NUnit的斷言消息評估
- 20. 帶消息延遲的批量重啓
- 21. RabbitMQ/AMQP中的消息組
- 22. RabbitMQ中的消息丟失
- 23. MQ消息被延遲幾天
- 24. NService總線消息延遲問題
- 25. 延遲消息隊列最佳實踐
- 26. vert.x sockJS實現延遲發送消息
- 27. 延遲或短暫暫停Windows消息
- 28. Google Cloud Messaging(GCM)下行消息延遲
- 29. 延遲調用消息隊列?
- 30. C#TCP第一個消息延遲
你需要使用的RabbitMQ? – 2013-02-13 21:49:29
是的,自RabbitMQ-3.5.8開始提供此功能。 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar 2017-06-03 02:11:43
如果你使用Spring AMQP,那麼[支持插件](https://docs.spring.io/spring-amqp/reference/htmlsingle /#延遲消息交換)。 – Gruber 2017-09-07 13:19:30