我使用同樣的消息爲例消費者張貼在這裏:永無止境的消息循環:Python中的RabbitMQ消費者交還
http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html
我用ExampleConsumer的原因是我對RabbitMQ的連接是失敗時的工作任務開始花更長的時間,其中更長的時間超過10分鐘。在長時間運行的任務完成並且該過程失敗之後,該連接表示關閉。它以前通過1000條消息,花了一分鐘左右的罰款。
ExampleConsumer似乎重新連接好,但是,在確認消息中,由於連接已經死亡,消息並未實際得到確認。它似乎通常從下面的確認消息方法返回。然後嘗試重新連接,之後剛剛完成的消息被重新傳遞。
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
如果您查看上面的異步示例,我正在使用SelectConnection。我沒有看到在該連接類型中可配置的心跳信號。 –
這是否有process_data_events()http://pika.readthedocs.org/en/latest/modules/adapters/select.html –
@JustinThomas,SelectConnection也沒有後臺線程。請參閱'pika.connection.ConnectionParameters'。您可以將ConnectionParameters傳遞給任何pika的連接。其中一個選項是將'heartbeat_interval = 0'傳遞給'ConnectionParameters'構造函數來禁用心跳。 – user1778420