2013-12-18 80 views
6

我正在使用basic_consume()接收消息,並且使用basic_cancel取消消費,但存在問題。如何在rabbitmq中暫停並恢復消費優雅,pika python

這裏是pika.channel

def basic_consume(self, consumer_callback, queue='', no_ack=False, 
         exclusive=False, consumer_tag=None): 
     """Sends the AMQP command Basic.Consume to the broker and binds messages 
     for the consumer_tag to the consumer callback. If you do not pass in 
     a consumer_tag, one will be automatically generated for you. Returns 
     the consumer tag. 

     For more information on basic_consume, see: 
     http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume 

     :param method consumer_callback: The method to callback when consuming 
     :param queue: The queue to consume from 
     :type queue: str or unicode 
     :param bool no_ack: Tell the broker to not expect a response 
     :param bool exclusive: Don't allow other consumers on the queue 
     :param consumer_tag: Specify your own consumer tag 
     :type consumer_tag: str or unicode 
     :rtype: str 

     """ 
     self._validate_channel_and_callback(consumer_callback) 

     # If a consumer tag was not passed, create one 
     consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number, 
                 uuid.uuid4().get_hex()) 

     if consumer_tag in self._consumers or consumer_tag in self._cancelled: 
      raise exceptions.DuplicateConsumerTag(consumer_tag) 

     self._consumers[consumer_tag] = consumer_callback 
     self._pending[consumer_tag] = list() 
     self._rpc(spec.Basic.Consume(queue=queue, 
            consumer_tag=consumer_tag, 
            no_ack=no_ack, 
            exclusive=exclusive), 
          self._on_eventok, 
          [(spec.Basic.ConsumeOk, 
          {'consumer_tag': consumer_tag})]) 

     return consumer_tag 

def basic_cancel(self, callback=None, consumer_tag='', nowait=False): 
     """This method cancels a consumer. This does not affect already 
     delivered messages, but it does mean the server will not send any more 
     messages for that consumer. The client may receive an arbitrary number 
     of messages in between sending the cancel method and receiving the 
     cancel-ok reply. It may also be sent from the server to the client in 
     the event of the consumer being unexpectedly cancelled (i.e. cancelled 
     for any reason other than the server receiving the corresponding 
     basic.cancel from the client). This allows clients to be notified of 
     the loss of consumers due to events such as queue deletion. 

     :param method callback: Method to call for a Basic.CancelOk response 
     :param str consumer_tag: Identifier for the consumer 
     :param bool nowait: Do not expect a Basic.CancelOk response 
     :raises: ValueError 

     """ 
     self._validate_channel_and_callback(callback) 
     if consumer_tag not in self.consumer_tags: 
      return 
     if callback: 
      if nowait is True: 
       raise ValueError('Can not pass a callback if nowait is True') 
      self.callbacks.add(self.channel_number, 
           spec.Basic.CancelOk, 
           callback) 
     self._cancelled.append(consumer_tag) 
     self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, 
            nowait=nowait), 
        self._on_cancelok, 
        [(spec.Basic.CancelOk, 
        {'consumer_tag': consumer_tag})] if nowait is False else []) 

正如你可以看到每一個我取消消費consumer_tag時間的代碼添加到列表_canceled。如果我再次在basic_consume中使用此標記,則會引發duplicateConsumer異常。 那麼,我可以每次使用一個新的consumer_tag,但事實上我不是。因爲早晚產生的標籤將與之前的標籤完全匹配。

我應該如何在pika中優雅地暫停和恢復消費?

回答

1

你有什麼理由爲自己定義你自己的consumer_tags?您可以傳遞一個空字符串,並讓RabbitMQ爲您生成消費者標籤。來自basic.consume的回覆,即basic.consume-ok將返回生成的consumer_tag,以便您稍後可以使用它來停止使用。

參見:http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume-ok

+0

從附加的pika模塊代碼中可以看出,即使在客戶端應用程序沒有指定消費者標籤的情況下,pika模塊也會生成一個並將其包含在消費請求中。 – mike

1

這看起來像鼠兔是做更多比它應該 - 它並不需要創建一個消費者標記,如果沒有提供一個(服務器會),它也並不需要觀察重複的消費者標籤(服務器支持使用相同標籤恢復)。

所以我不知道如何用Pika來做到這一點 - 我提供了一個錯誤。