2017-07-12 66 views
2

使用鼠圖書館的BlockingConnection連接到RabbitMQ的,發佈消息時,我偶爾會得到一個錯誤:RabbitMQ的水管壞了錯誤或丟失的郵件

Fatal Socket Error: error(32, 'Broken pipe')

這是一個非常簡單的子過程,出需要的一些信息內存中的隊列,併發送一個小的JSON消息到AMQP中。只有當系統幾分鐘內沒有發送任何消息時,錯誤纔會出現。

設置:

connection = pika.BlockingConnection(parameters) 
channel = self.connection.channel() 
channel.exchange_declare(
    exchange='xyz', 
    exchange_type='fanout', 
    passive=False, 
    durable=True, 
    auto_delete=False 
) 

排隊代碼捕獲的任何連接錯誤和重試:

def _enqueue(self, message_id, data): 
    try: 
     published = self.channel.basic_publish(
      self.amqp_exchange, 
      self.amqp_routing_key, 
      json.dumps(data), 
      pika.BasicProperties(
       content_type="application/json", 
       delivery_mode=2, 
       message_id=message_id 
      ) 
     ) 

     # Confirm delivery or retry 
     if published: 
      self.retry_count = 0 
     else: 
      raise EnqueueException("Message publish not confirmed.") 

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError, 
      pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError, 
      pika.exceptions.UnroutableError, socket.timeout) as e: 
     self.retry_count += 1 
     if self.retry_count < 5: 
      logging.warning("Reconnecting and resending") 
      if self.connection.is_open: 
       self.connection.close() 
      self.connect() 
      self._enqueue(message_id, data) 
     else: 
      raise e 

這有時工作的第二次嘗試。它經常掛起一段時間,或者在最終拋出異常之前丟棄消息(possibly related bug report)。由於它只發生在系統安靜幾分鐘時,我猜這是由於連接超時。但AMQP有一個心跳系統,據報道pika使用它(related bug report)。

爲什麼我會收到此錯誤或丟失消息,以及爲什麼在不使用時連接保持打開狀態?

回答

1

從另一個bug report

As BlockingConnection doesn't handle heartbeats in the background and the heartbeat_interval can't override the servers suggested heartbeat interval (that's a bug too), i suggest that heartbeats should be disabled by default (rely on TCP keep-alive instead).

If processing a task in a consume block takes longer time then the server suggested heartbeat interval, the connection will be closed by the server and the client won't be able to ack the message when it's done processing.

未發行update可能這個問題有所幫助。

所以我實施了一個解決方法。每隔30秒我會通過隊列發佈心跳消息。這樣可以保持連接的開放性,並且還可以向客戶確認我的應用程序已啓動並正在運行。

0

Broken Pipe錯誤意味着服務器正試圖在客戶端連接關閉時將某些內容寫入套接字。

正如我所看到的,你有一些共享的「self.connection」可能會在並行線程之前關閉?

您還可以將日誌級別設置爲DEBUG,並查看客戶端的日誌以確定客戶端關閉連接的時刻。

+0

那麼這意味着連接關閉在我身邊而不是兔子的? 'self.connection'是運行在其自己進程中的對象的一部分,因此該連接不與任何其他線程或進程共享。 –