使用鼠圖書館的BlockingConnection
連接到RabbitMQ的,發佈消息時,我偶爾會得到一個錯誤:RabbitMQ的水管壞了錯誤或丟失的郵件
Fatal Socket Error: error(32, 'Broken pipe')
這是一個非常簡單的子過程,出需要的一些信息內存中的隊列,併發送一個小的JSON消息到AMQP中。只有當系統幾分鐘內沒有發送任何消息時,錯誤纔會出現。
設置:
connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
exchange='xyz',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False
)
排隊代碼捕獲的任何連接錯誤和重試:
def _enqueue(self, message_id, data):
try:
published = self.channel.basic_publish(
self.amqp_exchange,
self.amqp_routing_key,
json.dumps(data),
pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
message_id=message_id
)
)
# Confirm delivery or retry
if published:
self.retry_count = 0
else:
raise EnqueueException("Message publish not confirmed.")
except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
pika.exceptions.UnroutableError, socket.timeout) as e:
self.retry_count += 1
if self.retry_count < 5:
logging.warning("Reconnecting and resending")
if self.connection.is_open:
self.connection.close()
self.connect()
self._enqueue(message_id, data)
else:
raise e
這有時工作的第二次嘗試。它經常掛起一段時間,或者在最終拋出異常之前丟棄消息(possibly related bug report)。由於它只發生在系統安靜幾分鐘時,我猜這是由於連接超時。但AMQP有一個心跳系統,據報道pika使用它(related bug report)。
爲什麼我會收到此錯誤或丟失消息,以及爲什麼在不使用時連接保持打開狀態?
那麼這意味着連接關閉在我身邊而不是兔子的? 'self.connection'是運行在其自己進程中的對象的一部分,因此該連接不與任何其他線程或進程共享。 –