爲了與RabbitMQ的管理我的員工我用下面的庫:
https://github.com/php-amqplib/php-amqplib
然後,我創建定義如何我的工人應該作品(包含所有RabbitMQ的邏輯),它給我一個班類似的東西:
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
abstract class QueueAMQPConsumer
{
protected $connection;
protected $debug;
protected $queueName;
protected $exchange;
public function __construct(AMQPStreamConnection $AMQPConnection, $queueName, $exchange = null)
{
$this->connection = $AMQPConnection;
$this->queueName = $queueName;
$this->exchange = $exchange;
}
public function run($debug = false)
{
$this->debug = $debug;
$channel = $this->connection->channel();
if ($this->exchange !== null) {
$channel->exchange_declare($this->exchange, "topic", false, true, false);
}
$channel->queue_declare($this->queueName, false, true, false, false);
if ($this->exchange !== null) {
$channel->queue_bind($this->queueName, $this->exchange);
}
$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'callback']);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$this->connection->close();
}
final function callback(AMQPMessage $message)
{
$result = $this->process($message);
if (false === $result) {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
} else {
$message->delivery_info['channel']->basic_ack($message-> delivery_info['delivery_tag']);
}
}
/**
* @param AMQPMessage $message
*
* @return bool
*/
abstract protected function process(AMQPMessage $message);
}
該類允許設置隊列,交換(在這種情況下,主題),服務質量(你可以自定義所有這些參數,它只是一個例子)等。
然後它會在回調中循環。這裏的回調是抽象方法進程(...),它將在需要處理隊列的不同工作人員上實現。所以的責任「循環/聽」是渠道上:$channel->wait();
然後,我將創建一個需要處理隊列中的消息的子類:
class MyWorker extends QueueAMQPConsumer
{
protected function process(AMQPMessage $message)
{
// .... process your message here
}
}
那麼工人會聽你的一直排隊,並在他們到達隊列時處理這些消息。 如果您的process(...)
返回其他內容而不是false,則會確認該消息。
你只需要啓動你的類像:
$consumer = new MyWorker(....);
$consumer->run();