如果消息w ^因爲未確認且申請失敗,它將自動重新發送並且信封上的redelivered
屬性將設置爲true
(除非您使用no-ack = true
標誌消耗它們)。
UPD:
你必須nack
消息還船旗在你的catch塊
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
謹防無限NACK的消息同時交付計不RabbitMQ的和AMQP協議實施在所有。
如果你不想惹這些消息,只是想增加你可能想之前nack
方法調用添加一些sleep()
或usleep()
一些延遲,但它是不是一個好主意。
有多種技術來處理週期重新提交問題:
1.依靠Dead Letter Exchanges
- 優點:可靠的,標準的,明確的
- 缺點:需要額外的邏輯
2。使用per message or per queue TTL
- 優點:易於實現,也是標準的,明確的
- 缺點:人龍,你可能會失去一些消息
例子(注意,對於隊列TTL我們通過只數和消息TTL - 任何將數字串):
2.1%消息的TTL:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2。每個隊列TTL:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3.按住重新傳送數或左重新傳送號(又名一跳IP堆棧限制或TTL),在郵件正文或頭
- 優點:給你額外的控制on消息生存時間應用級別
- 缺點:重大的開銷,而您必須修改消息並重新發布它,應用程序特定的,不清楚
代碼:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
有可能是一些其他的方式,以更好地控制消息重新傳送流。
結論:沒有銀子彈解決方案。你必須決定什麼解決方案適合你的需要最好或找出其他的東西,但不要忘記在這裏分享;)
你使用哪種語言,你能提供一些代碼? – pinepain
@ zaq178miami,看到我編輯的信息 –
@Bram_Gerritsen,看到我的答案更新 – pinepain