2013-07-12 122 views
0

我在創建一個簡單的任務隊列,其中包含名爲AMQP的RabbitMQ和PHP的PECL擴展。如何創建一個簡單的任務隊列

我的目標很簡單: 生產者應該將消息發送到包含需要處理的對象的包圍的特定隊列。

消費者都應該聽取所述隊列並處理消息。 我需要能夠添加更多的消費者,並讓RabbitMq以循環方式發送消息。

雖然這很容易找到python或java庫的教程,但我無法找到任何用於PHP的PECL庫。

我不太確定我是否應該綁定任何東西,我有一個自定義php庫的工作示例,該庫使用「basic_publish和basic_consume」,這些未在PECL庫中以這種方式實現。

所以這裏是我走到這一步: 出版商:

$oConfig = Zend_Registry::get('config'); 
$sQueue = $oConfig->amqp->validate_queue_name; 

$oConnection = new AMQPConnection(); 
$oConnection->setLogin($oConfig->amqp->login); 
$oConnection->setPassword($oConfig->amqp->pass); 
$oConnection->setVhost($oConfig->amqp->vhost); 
$oConnection->setPort($oConfig->amqp->port); 
$oConnection->connect(); 

$oChannel = new AMQPChannel($oConnection); 
$oExchange = new AMQPExchange($oChannel); 

$sMsg = new stdClass(); 
$sMsg->nId = $p_nId; 
$sMsg->nStatus= $p_nStatus; 
try { 
    $oChannel->startTransaction(); 
    $bResponse = $oExchange->publish($sMgs,$sQueue); 
    if (!$bResponse) { 
    echo "<h1>An error occured, the message can't be published</h1>"; 
    echo "<h3>Sorry i don't know why</h3>"; 
    exit; 
    } 
    $oChannel->commitTransaction(); 
} catch (Exception $oException) { 
    echo "<h1>An error occured, the message can't be published</h1>"; 
    echo "<h3>See error below</h3>"; 
    echo "<pre>"; 
    echo print_r($oException->getMessage()); 
    echo "</pre>"; 
    exit; 
} 

工人

$oConfig = Zend_Registry::get('config'); 
    $oConnection = new AMQPConnection(); 
    $oConnection->setLogin($oConfig->amqp->login); 
    $oConnection->setPassword($oConfig->amqp->pass); 
    $oConnection->setVhost($oConfig->amqp->vhost); 
    $oConnection->setPort($oConfig->amqp->port); 
    $oConnection->connect(); 

    $oChannel = new AMQPChannel($oConnection); 
    $oQueue = new AMQPQueue($oChannel); 

    $oQueue->declare($oConfig->amqp->validate_queue_name); 

    function processMessage($oMessage, $oQueue) { 
    $nId  = $msg->body->nId; 
    $nStatus = $msg->body->nStatus; 
    $oIniAct = $oActionMap->findBy('id',$nId); 

    $sReply = $oIniAct->updateStatusMisc($nStatus); 
    if ($sReply->status == $nStatus) { 
     $oQueue->ack($sMsg['delivery_tag']); 
    } else { 
    $oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE); 
    } 
    } 

    $oQueue->consume("processMessage",AMQP_NOPARAM); 

什麼PHP DOC告訴我的是,消費()將鎖定爲大家根據線程?所以基本上我一次只能有一名工作人員? 另外我看到人們綁定隊列,但第一個基本消耗的工人示例,我看到沒有使用它。

正如你可以看到我很困惑,任何幫助/方向/教程ASO ......將有助於

感謝

回答

0

PHP具有同步特性,所以是的,consume()而基本將鎖定主線程邏輯是讀取套接字連接上的所有傳入數據,將其轉換爲PHP結構並饋送到您的消費者功能。

在github上討論了使php-amqp異步的問題,但我們都同意,如果有人需要異步功能,PHP並不是最好的語言。

就我個人而言,我多次運行消費者腳本(實際上我有平衡器),因此每個消費者不會相互影響,他們可能會失敗並獨立重新啓動。我認爲你也可以這樣做。

我一次運行消費者腳本作爲守護進程和平衡器腳本(實際上沒有真正的負載平衡器)監視消費者活動(通過memcache完成,不清楚,但WFM),並且當消費者平衡器上沒有活動時一個一個殺死他們(但至少有一個工作的消費者應該還活着)。當消費者超負荷均衡器腳本啓動更多的消費者。

如果您需要消費一條消息,然後死亡讓您的消費者功能返回false

如果您確定隊列中至少有一條消息可用,您可能需要使用AMQPQueue::get()方法,該方法不會阻塞(或至少不應該)您的主線程。

+0

你是什麼意思?您在負載均衡器後面運行使用者腳本?我不確定我明白。基本上,如果我只有兩個工人代碼的出現,它不會工作? 你如何做到這一點,你有你的工作人員只消費一條消息,並立即死亡? 所有這些對我來說都很不清楚:s – user1159791

+0

更新了我的答案。也許get()會適合你的需求呢? – pinepain

+0

其實我認爲我可以阻止我的主線程。我想我明白,多個消費者將能夠以一種合理的方式工作。 基本上這是我的一個誤解,你和一堆閱讀clarfied。 謝謝! – user1159791