1
這是事情。如何從AMQP(RabbitMQ)隊列中刪除消息?
我從RabbitMQ的爲了處理上發送每封電子郵件的重要信息使用PHP AMQP讀取結果隊列。完成後,我需要刪除或標記爲已寫入的消息,以便下次讀隊列時我不會收到已處理的消息。
由於Rabbitmq服務器每小時發送超過10,000封電子郵件,每當我讀取隊列以處理結果發送時,腳本可以運行至少5分鐘以處理隊列中的所有郵件,所以在它之後已經完成,在這5分鐘內有數百個新消息被放置。這使得我無法在腳本完成後清除隊列,因爲它會在腳本運行期間刪除未處理的腳本。
這給我留下了只有一個選擇。在AMQP腳本處理或讀取之後立即標記或刪除消息。
有沒有辦法做到這一點? (這裏是腳本)
<?php
/**
* result.php
* Script that connects to RabbitMQ, and takes the result message from
* the result message queue.
*/
// include the settings
require_once('settings.php');
// try to set up a connection to the RabbitMQ server
try
{
// construct the connection to the RabbitMQ server
$connection = new AMQPConnection(array(
'host' => $hostname,
'login' => $username,
'password' => $password,
'vhost' => $vhost
));
// connect to the RabbitMQ server
$connection->connect();
}
catch (AMQPException $exception)
{
echo "Could not establish a connection to the RabbitMQ server.\n";
}
// try to create the channel
try
{
// open the channel
$channel = new AMQPChannel($connection);
}
catch (AMQPConnectionException $exception)
{
echo "Connection to the broker was lost (creating channel).\n";
}
// try to create the queue
try
{
// create the queue and bind the exchange
$queue = new AMQPQueue($channel);
$queue->setName($resultbox);
$queue->setFlags(AMQP_DURABLE);
$queue->bind('exchange1', 'key1');
$queue->declare();
}
catch (AMQPQueueException $exception)
{
echo "Channel is not connected to a broker (creating queue).\n";
}
catch (AMQPConnectionException $exception)
{
echo "Connection to the broker was lost. (creating queue)/\n";
}
// Get the message from the queue.
while ($envelope = $queue->get()) {
//Function that processes the message
process_message($envelope->getBody());
}
$queue->purge();
// done, close the connection to RabbitMQ
$connection->disconnect();
?>
我如何能實現它的PHP代碼?謝謝 – rodvela
用代碼更新了我的答案。我相信你已經閱讀過,但以防萬一,這裏是官方文檔http://php.net/manual/en/book.amqp.php,它在某些部分有些過時,但仍然很好理解如何使用php-amqp並且有一些你可能會覺得有用的例子 – pinepain
ps看看這裏的例子https://github.com/pinepain/amqpy/tree/master/demo/canonical – pinepain