2015-11-16 33 views
0

我使用同樣的消息爲例消費者張貼在這裏:永無止境的消息循環:Python中的RabbitMQ消費者交還

http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html

我用ExampleConsumer的原因是我對RabbitMQ的連接是失敗時的工作任務開始花更長的時間,其中更長的時間超過10分鐘。在長時間運行的任務完成並且該過程失敗之後,該連接表示關閉。它以前通過1000條消息,花了一分鐘左右的罰款。

ExampleConsumer似乎重新連接好,但是,在確認消息中,由於連接已經死亡,消息並未實際得到確認。它似乎通常從下面的確認消息方法返回。然後嘗試重新連接,之後剛剛完成的消息被重新傳遞。

def acknowledge_message(self, delivery_tag): 
     """Acknowledge the message delivery from RabbitMQ by sending a 
     Basic.Ack RPC method for the delivery tag. 

     :param int delivery_tag: The delivery tag from the Basic.Deliver frame 

     """ 
     LOGGER.info('Acknowledging message %s', delivery_tag) 
     self._channel.basic_ack(delivery_tag) 

回答

1

RabbitMQ代理實現默認的心跳超時,根據RabbitMQ版本的不同,可以是~10分鐘或1分鐘;較短的默認值是最新版本,以RabbitMQ v3.5.5開頭。應用程序可以通過連接參數傳遞一個明確的更長的心跳超時偏好。 Pika的SelectConnection沒有後臺線程,因此當工作任務花費的時間超過檢測信號超時時間時,SelectConnection將無法在代理預期的時間限制內服務檢測信號,代理將丟棄該連接。有多種方法可以嘗試解決此問題:

  1. 通過pika.connection.ConnectionParameters(可能是最簡單的)設置較長的心跳超時優先級。 ConnectionParameters.heartbeat_interval = 0應該完全禁用心跳(和心跳超時)。
  2. 在與任務處理邏輯分離的線程上運行連接
  3. 切換到協作式多任務連接類型之一,例如Pika中的Tornado或基於Twisted框架的適配器或Haigha中基於gevent的適配器。這種改變需要任務處理邏輯對協作式多任務處理很友善。
+0

如果您查看上面的異步示例,我正在使用SelectConnection。我沒有看到在該連接類型中可配置的心跳信號。 –

+0

這是否有process_data_events()http://pika.readthedocs.org/en/latest/modules/adapters/select.html –

+0

@JustinThomas,SelectConnection也沒有後臺線程。請參閱'pika.connection.ConnectionParameters'。您可以將ConnectionParameters傳遞給任何pika的連接。其中一個選項是將'heartbeat_interval = 0'傳遞給'ConnectionParameters'構造函數來禁用心跳。 – user1778420

0

您可能需要add a heartbeat to your message consumer來保持您的連接處於活動狀態。

如果rabbitmq認爲消費者在消息處於「未確認」模式(仍在處理中)時死亡,它會將消息放回隊列中。有心跳可能有助於保持連接活躍,防止這種情況發生。

+0

我使用SelectConnection並沒有看到有關對心跳東西。 –

0

,如果您使用的是鼠兔異步消費者舉例來說,你只需要這個變化添加到init方法:

self._url = 'amqp://{}:{}@{}:{}/%2F{}'.format(
        self.USERNAME, self.PASSWORD, self.ADDRESS, self.PORT, self.QUERY) 

與self.QUERY可參數化設置不同的參數字符串,心跳如下:

self.QUERY ='?heartbeat_interval=600' 

connect方法將負責心跳事務。

def connect(self): 
    """This method connects to RabbitMQ, returning the connection handle. 
    When the connection is established, the on_connection_open method 
    will be invoked by pika. 

    :rtype: pika.SelectConnection 

    """ 
    LOGGER.info('Connecting to %s', self._url) 
    return pika.SelectConnection(parameters=pika.URLParameters(self._url), 
           on_open_callback=self.on_connection_open, 
           on_open_error_callback=self.on_connection_error, 
           stop_ioloop_on_close=False, 
           ) 

這是告訴RabbitMQ心跳與您的消費者關聯的一種非常好的方式。請注意,RabbitMQ將強制它至少60秒。因此你不能將它設置得更低。這些連接參數

更多信息: https://pika.readthedocs.io/en/latest/modules/parameters.html