2016-07-07 179 views
1

我有一個使用RabbitMQ的PHP應用程序。爲了實現冗餘,我創建了一對RabbitMQ服務器並將它們集成在一起。我還有一個運行HAProxy的VyOS故障轉移羣集來負載均衡連接,並在發生故障轉移時提供重新路由。php rabbitmq消費者重新連接

昨天,我們的VyOS集羣決定需要故障切換(可能是短暫的網絡中斷)。 HAproxy在一個VyOS上停止,虛擬IP移動,並在另一個節點上重新啓動HAproxy。

在此之後,我查看了兔子的隊列,看到每個隊列都有零個消費者。我檢查了運行消費者的機器仍然運行着PHP。我留下了他們一段時間,看看他們是否會重新連接,他們沒有。我不得不殺掉PHP腳本並重新啓動它們,然後重新連接並立即開始消耗。

我認爲RabbitMQ和HAproxy正在按預期工作......現在我需要PHP使用者支持故障轉移事件...換句話說,它不需要掛起,而是需要檢測斷開連接並自動重新連接。

這是我的RabbitMQ類。感謝您提前提供任何幫助!

<?php 
while(true) 
{ 
    try{getMessages("transcode2");} 
    catch(Exception $e){echo($e->getMessage()."\n");} 
    sleep(1); 
} 
require_once("../api/db.php"); 
require_once("../vendor/autoload.php"); 
use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
function sendMessage($msg,$prio) 
{ 
    global $channel; 
    $msg=json_encode($msg); 
    $queue="transcode2"; 
    $channel->queue_declare($queue,true,false,false,false); 
    $channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue); 
} 
function getMessages($queue) 
{ 
    global $connection,$channel; 
    $connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD); 
    $channel=$connection->channel(); 
    $channel->queue_declare($queue,true,false,false,false); 
    $callback=function($msg) 
    { 
     if(handleMessage(json_decode($msg->body,true))) 
     { 
      $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
     } 
     else 
     { 
      $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true); 
     } 
    }; 
    $channel->basic_qos(null,1,null); 
    $channel->basic_consume($queue,'',false,false,false,false,$callback); 
    while(count($channel->callbacks)) 
    { 
     try{$channel->wait();} 
     catch(Exception $e) 
     { 
      break; 
     } 
    } 
    $channel->close(); 
    $connection->close(); 
} 
?> 

回答

0

如果你使用timeout參數給$ channel-> wait();

+0

我改變了$ channel-> wait();到$ channel-> wait(null,false,60);它的工作,但值得注意的是,它不保留在while(count($ channel-> callbacks))循環中,它退出getmessage函數關閉連接,並在getmessages在頂部運行時重新啓動整個連接while循環。即使有很多重新連接的開銷,我猜也是可以的。 –

0

零超時無法正常工作,正如您所說的,代理可能會關閉連接,PHP消費者不會發現它。

解決方案將使用沒有零接收超時。確保連接超時大於接收超時。

下面是基於AMQP Interop一個例子:

安裝AMQP互操作兼容的傳輸,例如:

composer require enqueue/amqp-bunny 

的代碼做同樣的事情,你有明確的設定超時:

<?php 
use Enqueue\AmqpBunny\AmqpConnectionFactory; 
use Interop\Amqp\AmqpConsumer; 
use Interop\Amqp\AmqpMessage; 
use Interop\Amqp\AmqpQueue; 

$context = (new AmqpConnectionFactory(sprintf(
    'amqp://%s:%[email protected]%s:%s/%2f?connection_timeout=600', // 10 min 
    RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT 
)))->createContext(); 

$context->setQos(null,1,null); 

//sendMessage 

$queue = $context->createQueue("transcode2"); 
$queue->addFlag(AmqpQueue::FLAG_PASSIVE); 
$context->declareQueue($queue); 

$message = $context->createMessage(json_encode($msg)); 
$message->setPriority($prio); 

$producer = $context->createProducer(); 
$producer->send($queue, $message); 

// getMessages 

$consumer = $context->createConsumer($queue); 
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) { 
    if(handleMessage(json_decode($message->getBody(), true))) { 
     $consumer->acknowledge($message); 
    } else { 
     $consumer->reject($message); 
    } 

    return true; 
}); 

$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now. 

$context->consume($receiveTimeout);