我試圖不斷聽取隊列後斷開了我,但經過一分鐘左右(假設我的隊列爲空)我斷開了與此錯誤:RabbitMQ的一段時間
DEBUG:pika.adapters.blocking_connection:Outbound buffer size: 0
DEBUG:pika.adapters.blocking_connection:Outbound buffer size: 0
ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
DEBUG:pika.adapters.blocking_connection:Handling disconnect
INFO:pika.adapters.blocking_connection:on_connection_closed: None, True
WARNING:pika.adapters.blocking_connection:Received Channel.Close, closing: None
DEBUG:pika.callback:Clearing out '1' from the stack
Traceback (most recent call last):
File "controller.py", line 59, in <module>
c.run()
File "controller.py", line 55, in run
self.listen_queue() # Blocking function
File "controller.py", line 25, in listen_queue
self.channel.start_consuming() # Start consuming
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 814, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 168, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 272, in _handle_read
super(BlockingConnection, self)._handle_read()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 315, in _handle_read
return self._handle_disconnect()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 263, in _handle_disconnect
self._on_connection_closed(None, True)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 321, in _on_connection_closed
self._channels[channel]._on_close(method_frame)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 914, in _on_close
raise exceptions.ChannelClosed(0, 'Not specified')
pika.exceptions.ChannelClosed: (0, 'Not specified')
這是我的代碼:
class RabbitConnector():
def __init__(self):
self._connect()
def _connect(self):
logger.info('Trying to connect to RabbitMQ')
while True:
try:
conn_broker = pika.BlockingConnection(
pika.ConnectionParameters(
host=conf.rabbit_server,
port=conf.rabbit_port,
virtual_host=conf.rabbit_vhost,
ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
heartbeat_interval=conf.rabbit_heartbeat_interval,
credentials=pika.PlainCredentials(
conf.rabbit_user,
conf.rabbit_pass)))
logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port))
self.channel = conn_broker.channel()
# Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
self.channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
status = self.channel.queue_declare(queue=conf.rabbit_queue_name,
durable=conf.rabbit_queue_durable,
exclusive=conf.rabbit_queue_exclusive,
passive=conf.rabbit_queue_passive)
if status.method.message_count == 0:
logger.info("Queue empty")
else:
logger.info('Queue status: %s' % status)
self.channel.queue_bind(
queue=conf.rabbit_queue_name,
exchange=conf.rabbit_exchange_name,
routing_key=conf.rabbit_exchange_routing_key)
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), e:
time.sleep(3)
logger.error('Exception while connecting to Rabbit %s' %e)
else:
break
def get_channel(self):
return self.channel
我也有這個錯誤,這不是我想要的或期望的。我將嘗試輪詢隊列的同步版本,而不是basic_consume&start_consuming成語。 – user2759991