我正在尋找一種將kombu用作tornado-sockjs和Django應用程序服務器之間的MQ適配器的方法。我做了這樣的事情:非阻塞方式的Kombu
class BrokerClient(ConsumerMixin):
clients = []
def __init__(self):
self.connection = BrokerConnection(settings.BROKER_URL)
self.io_loop = ioloop.IOLoop.instance()
self.queue = sockjs_queue
self._handle_loop()
@staticmethod
def instance():
if not hasattr(BrokerClient, '_instance'):
BrokerClient._instance = BrokerClient()
return BrokerClient._instance
def add_client(self, client):
self.clients.append(client)
def remove_client(self, client):
self.clients.remove(client)
def _handle_loop(self):
try:
if self.restart_limit.can_consume(1):
for _ in self.consume(limit=5):
pass
except self.connection.connection_errors:
print ('Connection to broker lost. '
'Trying to re-establish the connection...')
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)
def get_consumers(self, Consumer, channel):
return [Consumer([self.queue, ], callbacks=[self.process_task])]
def process_task(self, body, message):
for client in self.clients:
if hasattr(body, 'users') and client.user.pk in body.users:
client.send(body)
message.ack()
但是,在_handle_loop執行時(如預期的)龍捲風被阻止。
有什麼辦法可以防止這種情況發生?
我知道Tornado的Pika庫適配器,但我想使用kombu,因爲它已用於項目並具有靈活的傳輸。
UPDATE:
更改_handle_loop到發生器功能
def drain_events(self, callback):
with self.Consumer() as (connection, channel, consumers):
with self.extra_context(connection, channel):
try:
connection.drain_events(timeout=1)
except:
pass
callback(None)
@tornado.gen.engine
def _handle_loop(self):
response = yield tornado.gen.Task(self.drain_events)
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)
注意:在撰寫本文時,最新的py-amqp和kombu並不正式支持異步讀取。 [見此](https://github.com/celery/py-amqp/issues/25)。但是,對於異步消費,有[breadcrumbs](https://github.com/celery/kombu/blob/master/examples/experimental/async_consume.py) – Realistic