2016-09-07 76 views
0

假設我有經常充滿一些數據的兔子隊列(例如,用戶提供了一些我們需要稍後分析的動作)。每秒鐘新增30個項目。 我需要的是創建一個工作人員,通過這個隊列來查看這個數據並執行一些任務。我可以這樣做:創建php rabbit worker的最佳方法

class Worker 
{ 

    public function run() 
    { 
     $queue = new Queue('exchange', 'queue'); 
     while (true) 
     { 
      $queue->processQueue(); 
     } 
    } 
} 

而不僅僅是在服務器上運行worker.php,似乎工作。

但我想知道,如果這個無限循環會給我的兔子實例增加額外的負載,如果沒有數據要繼續?也許更好的辦法是做像水木清華

class Worker 
{ 
    CONST IDLE = 5; 

    private $start = 0; 

    public function run() 
    { 
     $this->start = time(); 

     $queue = new Queue('exchange', 'queue'); 
     while (true) 
     { 
      $queue->processQueue(); 

      //don't allow this worker to be working a lot 
      if (time() - $this->start >= 60 * 60 - self::IDLE) 
      { 
       break; 
      } 

      sleep(self::IDLE); 
     } 

     $queue->close(); 
    } 
} 

所以我的工人便不會從兔不斷,但睡眠數據一陣子。在工作一小時後,它會停止工作,另一個工作實例將被crontab作業或其他作業調用。

回答

2

爲了與RabbitMQ的管理我的員工我用下面的庫:

https://github.com/php-amqplib/php-amqplib

然後,我創建定義如何我的工人應該作品(包含所有RabbitMQ的邏輯),它給我一個班類似的東西:

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 

abstract class QueueAMQPConsumer 
{  
    protected $connection; 

    protected $debug; 

    protected $queueName; 

    protected $exchange; 

    public function __construct(AMQPStreamConnection $AMQPConnection, $queueName, $exchange = null) 
    { 
     $this->connection = $AMQPConnection; 
     $this->queueName = $queueName; 
     $this->exchange = $exchange; 
    } 

    public function run($debug = false) 
    { 
     $this->debug = $debug; 
     $channel = $this->connection->channel(); 
     if ($this->exchange !== null) { 
      $channel->exchange_declare($this->exchange, "topic", false, true, false); 
     } 

     $channel->queue_declare($this->queueName, false, true, false, false); 
     if ($this->exchange !== null) { 
      $channel->queue_bind($this->queueName, $this->exchange); 
     } 

     $channel->basic_qos(null, 1, null); 
     $channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'callback']); 

     while (count($channel->callbacks)) { 
      $channel->wait(); 
     } 

     $channel->close(); 
     $this->connection->close(); 
    } 


    final function callback(AMQPMessage $message) 
    { 
     $result = $this->process($message); 

     if (false === $result) { 
      $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true); 
     } else { 
      $message->delivery_info['channel']->basic_ack($message-> delivery_info['delivery_tag']); 
     } 
    } 

    /** 
    * @param AMQPMessage $message 
    * 
    * @return bool 
    */ 
    abstract protected function process(AMQPMessage $message); 
} 

該類允許設置隊列,交換(在這種情況下,主題),服務質量(你可以自定義所有這些參數,它只是一個例子)等。

然後它會在回調中循環。這裏的回調是抽象方法進程(...),它將在需要處理隊列的不同工作人員上實現。所以的責任「循環/聽」是渠道上:$channel->wait();

然後,我將創建一個需要處理隊列中的消息的子類:

class MyWorker extends QueueAMQPConsumer 
{ 
    protected function process(AMQPMessage $message) 
    { 
     // .... process your message here 
    } 
} 

那麼工人會聽你的一直排隊,並在他們到達隊列時處理這些消息。 如果您的process(...)返回其他內容而不是false,則會確認該消息。

你只需要啓動你的類像:

$consumer = new MyWorker(....);  
$consumer->run();