我有一個使用RabbitMQ的PHP應用程序。爲了實現冗餘,我創建了一對RabbitMQ服務器並將它們集成在一起。我還有一個運行HAProxy的VyOS故障轉移羣集來負載均衡連接,並在發生故障轉移時提供重新路由。php rabbitmq消費者重新連接
昨天,我們的VyOS集羣決定需要故障切換(可能是短暫的網絡中斷)。 HAproxy在一個VyOS上停止,虛擬IP移動,並在另一個節點上重新啓動HAproxy。
在此之後,我查看了兔子的隊列,看到每個隊列都有零個消費者。我檢查了運行消費者的機器仍然運行着PHP。我留下了他們一段時間,看看他們是否會重新連接,他們沒有。我不得不殺掉PHP腳本並重新啓動它們,然後重新連接並立即開始消耗。
我認爲RabbitMQ和HAproxy正在按預期工作......現在我需要PHP使用者支持故障轉移事件...換句話說,它不需要掛起,而是需要檢測斷開連接並自動重新連接。
這是我的RabbitMQ類。感謝您提前提供任何幫助!
<?php
while(true)
{
try{getMessages("transcode2");}
catch(Exception $e){echo($e->getMessage()."\n");}
sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
global $channel;
$msg=json_encode($msg);
$queue="transcode2";
$channel->queue_declare($queue,true,false,false,false);
$channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
global $connection,$channel;
$connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
$channel=$connection->channel();
$channel->queue_declare($queue,true,false,false,false);
$callback=function($msg)
{
if(handleMessage(json_decode($msg->body,true)))
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
else
{
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
}
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue,'',false,false,false,false,$callback);
while(count($channel->callbacks))
{
try{$channel->wait();}
catch(Exception $e)
{
break;
}
}
$channel->close();
$connection->close();
}
?>
我改變了$ channel-> wait();到$ channel-> wait(null,false,60);它的工作,但值得注意的是,它不保留在while(count($ channel-> callbacks))循環中,它退出getmessage函數關閉連接,並在getmessages在頂部運行時重新啓動整個連接while循環。即使有很多重新連接的開銷,我猜也是可以的。 –