2017-03-24 99 views
14

最近,我在生產者/消費者隊列系統上做了一個快速實現。PHP AMQP延遲隊列的實現

<?php 
namespace Queue; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable;  

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     // First, we need to make sure that RabbitMQ will never lose our queue. 
     // In order to do so, we need to declare it as durable. To do so we pass 
     // the third parameter to queue_declare as true. 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    // Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called 
    // We should call close explicitly if possible. 
    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      // First, we need to make sure that RabbitMQ will never lose our queue. 
      // In order to do so, we need to declare it as durable. To do so we pass 
      // the third parameter to queue_declare as true. 
      $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     // This tells RabbitMQ not to give more than one message to a worker at 
     // a time. 
     $this->channel->basic_qos(null, 1, null); 

     // Requires ack. 
     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback')); 

     while(count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function consumeCallback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     // Very important to ack, in order to remove msg from queue. Ack after 
     // callback, as exception might happen in callback. 
     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 

    public function getQueueSize() 
    { 
     // three tuple containing (<queue name>, <message count>, <consumer count>) 
     $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false); 
     if ($tuple != null && isset($tuple[1])) { 
      return $tuple[1]; 
     } 
     return -1; 
    } 
} 

public function producepublic function consume對按預期工作。

然而,當談到與延遲排隊系統

public function produceWithDelaypublic function consume對不能按預期工作。消費者撥打consume,無法收到任何物品,甚至等待一段時間。

我認爲我的produceWithDelay執行不正確。我可以知道這是什麼問題嗎?

+0

試圖聲明您的隊列作爲'$通道 - > queue_declare( 「名」,假的,假的,假的,真正的,真實的,陣列());'和交流,也許下一個跟隨這名[要點](https://gist.github.com/tairov/11289983) – Vardius

+0

沒有必要從頭開始實施它。這就是你應該這樣做https://stackoverflow.com/a/45549182/579025 –

回答

1

請注意。

我發現這是由我自己的bug造成的。

而不是

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

我應該把它寫在

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare(delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

我的成員變量尚未初始化正常。

完全可行的代碼如下,供您參考。

<?php 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      $this->channel->queue_declare($delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     $this->channel->basic_qos(null, 1, null); 

     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback')); 

     while (count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function callback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 
} 
3

首先確認您的插件rabbitmq_delayed_message_exchange通過運行命令啓用:rabbitmq-plugins list,如果不是 - 閱讀更多信息here

而且您必須更新您的__construct方法,因爲您需要以另一種方式稍微聲明隊列。我不想假裝更新您的結構,但願意提供我的簡單的例子:

申報隊列:

<?php 

require_once __DIR__ . '/../vendor/autoload.php'; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 
$channel = $connection->channel(); 
$args = new AMQPTable(['x-delayed-type' => 'fanout']); 
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); 
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']); 
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args); 
$channel->queue_bind('delayed_queue', 'delayed_exchange'); 

發送消息:

$data = 'Hello World at ' . date('Y-m-d H:i:s'); 
$delay = 7000; 
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); 
$headers = new AMQPTable(['x-delay' => $delay]); 
$message->set('application_headers', $headers); 
$channel->basic_publish($message, 'delayed_exchange'); 
printf(' [x] Message sent: %s %s', $data, PHP_EOL); 
$channel->close(); 
$connection->close(); 

接收消息:

$callback = function (AMQPMessage $message) { 
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL); 
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); 
}; 
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback); 
while(count($channel->callbacks)) { 
    $channel->wait(); 
} 
$channel->close(); 
$connection->close(); 

你也可以找到源文件here
希望它能幫助你!