2013-07-15 96 views
11

我已經創建了一個簡單的發佈者和消費者,使用basic.consume在隊列上訂閱。消費不確認消息來自RabbitMq

當作業運行時,我的客戶承認消息沒有異常。每當我遇到異常時,我都不會回覆信息並提前回復。只有已確認的消息從隊列中消失,因此工作正常。
現在我想要消費者再次拿起失敗的消息,但重新構建這些消息的唯一方法是重新啓動消費者。

我該如何處理這個用例?

設置代碼

$channel = new AMQPChannel($connection); 

$exchange = new AMQPExchange($channel); 

$exchange->setName('my-exchange'); 
$exchange->setType('fanout'); 
$exchange->declare(); 

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declare(); 
$queue->bind('my-exchange'); 

消費者代碼

$queue->consume(array($this, 'callback')); 

public function callback(AMQPEnvelope $msg) 
{ 
    try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return; 
    } 
    return $queue->ack($msg->getDeliveryTag()); 
} 

生產者代碼

$exchange->publish('message'); 
+0

你使用哪種語言,你能提供一些代碼? – pinepain

+0

@ zaq178miami,看到我編輯的信息 –

+0

@Bram_Gerritsen,看到我的答案更新 – pinepain

回答

15

如果消息w ^因爲未確認且申請失敗,它將自動重新發送並且信封上的redelivered屬性將設置爲true(除非您使用no-ack = true標誌消耗它們)。

UPD:

你必須nack消息還船旗在你的catch塊

try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); 
    } 

謹防無限NACK的消息同時交付計不RabbitMQ的和AMQP協議實施在所有。

如果你不想惹這些消息,只是想增加你可能想之前nack方法調用添加一些sleep()usleep()一些延遲,但它是不是一個好主意。

有多種技術來處理週期重新提交問題:

1.依靠Dead Letter Exchanges

  • 優點:可靠的,標準的,明確的
  • 缺點:需要額外的邏輯

2。使用per message or per queue TTL

  • 優點:易於實現,也是標準的,明確的
  • 缺點:人龍,你可能會失去一些消息

例子(注意,對於隊列TTL我們通過只數和消息TTL - 任何將數字串):

2.1%消息的TTL:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'expiration' => '1000' 
    ) 
); 

2.2。每個隊列TTL:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->setArgument('x-message-ttl', 1000); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish('message at ' . microtime(true)); 

3.按住重新傳送數或左重新傳送號(又名一跳IP堆棧限制或TTL),在郵件正文或頭

  • 優點:給你額外的控制on消息生存時間應用級別
  • 缺點:重大的開銷,而您必須修改消息並重新發布它,應用程序特定的,不清楚

代碼:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'headers' => array(
      'ttl' => 100 
     ) 
    ) 
); 

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { 
     $headers = $msg->getHeaders(); 
     echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; 
     echo $msg->getDeliveryTag(), ' '; 
     echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; 
     echo $msg->getBody(), PHP_EOL; 

     try { 
      //Do some business logic 
      throw new Exception('business logic failed'); 
     } catch (Exception $ex) { 
      //Log exception 
      if (isset($headers['ttl'])) { 
       // with ttl logic 

       if ($headers['ttl'] > 0) { 
        $headers['ttl']--; 

        $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); 
       } 

       return $queue->ack($msg->getDeliveryTag()); 
      } else { 
       // without ttl logic 
       return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue 
      } 

     } 

     return $queue->ack($msg->getDeliveryTag()); 
    } 
); 

有可能是一些其他的方式,以更好地控制消息重新傳送流。

結論:沒有銀子彈解決方案。你必須決定什麼解決方案適合你的需要最好或找出其他的東西,但不要忘記在這裏分享;)

+0

感謝您的回答。 'redelivered'確實設置爲'true',但我必須重新啓動阻止消費者才能重新獲取消息。 –

+0

謝謝,這正是我需要的。你能給我一些方向/建議如何防止無限重傳信息?如果我能夠將隊列中的隊列延遲一段給定的時間,那將會很好,所以我不會讓我的服務器超載。 –

+0

在這裏你再次更新答案 – pinepain

0

如果你不想重新啓動消費者,然後basic.recover AMQP命令可能是你想。根據AMQP protocol

basic.recover(bit requeue) 

Redeliver unacknowledged messages. 

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
+0

此方法似乎不是我正在使用的客戶端API的一部分。 http://www.php.net/manual/en/book.amqp.php –

+1

RabbitMQ對此方法有部分支持,請參閱[官方文檔](https://www.rabbitmq.com/specification.html#方法狀態-basic.recover) – pinepain