2013-07-25 105 views
1

這是事情。如何從AMQP(RabbitMQ)隊列中刪除消息?

我從RabbitMQ的爲了處理上發送每封電子郵件的重要信息使用PHP AMQP讀取結果隊列。完成後,我需要刪除或標記爲已寫入的消息,以便下次讀隊列時我不會收到已處理的消息。

由於Rabbitmq服務器每小時發送超過10,000封電子郵件,每當我讀取隊列以處理結果發送時,腳本可以運行至少5分鐘以處理隊列中的所有郵件,所以在它之後已經完成,在這5分鐘內有數百個新消息被放置。這使得我無法在腳本完成後清除隊列,因爲它會在腳本運行期間刪除未處理的腳本。

這給我留下了只有一個選擇。在AMQP腳本處理或讀取之後立即標記或刪除消息。

有沒有辦法做到這一點? (這裏是腳本)

<?php 
/** 
* result.php 
* Script that connects to RabbitMQ, and takes the result message from 
* the result message queue. 
*/ 

// include the settings 
require_once('settings.php'); 

// try to set up a connection to the RabbitMQ server 
try 
{ 
    // construct the connection to the RabbitMQ server 
    $connection = new AMQPConnection(array(
     'host'  => $hostname, 
     'login'  => $username, 
     'password' => $password, 
     'vhost'  => $vhost 
    )); 

    // connect to the RabbitMQ server 
    $connection->connect(); 
} 
catch (AMQPException $exception) 
{ 
    echo "Could not establish a connection to the RabbitMQ server.\n"; 
} 

// try to create the channel 
try 
{ 
    // open the channel 
    $channel = new AMQPChannel($connection); 
} 
catch (AMQPConnectionException $exception) 
{ 
    echo "Connection to the broker was lost (creating channel).\n"; 
} 

// try to create the queue 
try 
{ 
    // create the queue and bind the exchange 
    $queue = new AMQPQueue($channel); 
    $queue->setName($resultbox); 
    $queue->setFlags(AMQP_DURABLE); 
    $queue->bind('exchange1', 'key1'); 
    $queue->declare(); 
} 
catch (AMQPQueueException $exception) 
{ 
    echo "Channel is not connected to a broker (creating queue).\n"; 
} 
catch (AMQPConnectionException $exception) 
{ 
    echo "Connection to the broker was lost. (creating queue)/\n"; 
} 

// Get the message from the queue. 
while ($envelope = $queue->get()) { 
    //Function that processes the message 
    process_message($envelope->getBody()); 
} 
    $queue->purge(); 

// done, close the connection to RabbitMQ 
$connection->disconnect(); 
?> 

回答

3

確認消息(S)$queue->ack()成功處理後,甚至消耗/與AMQP_AUTOACK標誌得到他們。

UPD:

基於您的代碼:

while ($envelope = $queue->get(AMQP_AUTOACK)) { 
    //Function that processes the message 
    process_message($envelope->getBody()); 
} 

PS:

1. Ack'ing消息
while ($envelope = $queue->get()) { 
    //Function that processes the message 
    process_message($envelope->getBody()); 
    $queue->ack($envelope->getDeliveryTag()); 
} 
2. AMQP_AUTOACK標誌獲取它

檢查AMQPQueue::consume文檔,看起來是更適合這裏。

3.您可以使用和它被處理後,確認消息:
$queue->consume(function ($envelope, $queue) { 
     process_message($envelope->getBody()); 
     $queue->ack($envelope->getDeliveryTag()); 
}); 
4或 AMQP_AUTOACK標誌消耗,但是當處理。第失敗,你將無法再次處理消息:
$queue->consume(function ($envelope, $queue) { 
     process_message($envelope->getBody()); 
     $queue->ack($envelope->getDeliveryTag()); 
}, AMQP_AUTOACK); 

結論:我會推薦使用#3解決方案,但這取決於你。

+0

我如何能實現它的PHP代碼?謝謝 – rodvela

+0

用代碼更新了我的答案。我相信你已經閱讀過,但以防萬一,這裏是官方文檔http://php.net/manual/en/book.amqp.php,它在某些部分有些過時,但仍然很好理解如何使用php-amqp並且有一些你可能會覺得有用的例子 – pinepain

+0

ps看看這裏的例子https://github.com/pinepain/amqpy/tree/master/demo/canonical – pinepain

相關問題