0
我需要建立一個能夠以編程方式控制的Kombu消費者。我見過的所有例子都只是告訴你使用ctrl-c來停止程序。Kombu消費者作爲節儉服務
我的主要應用程序是作爲一個扭曲Thrift服務運行,我想我可以以某種方式使用Twisted reactor來處理我的消費者內的eventloop,但我無法弄清楚如何。
這是我的消費類。 start_consuming()部分是好的,除了它是阻塞的,我不能從外部調用stop_consuming()。
from kombu import BrokerConnection, Exchange, eventloop, Queue, Consumer
class DMS():
__routing_key = None
__is_consuming = None
__message_counter = 0
def __init__(self, routing_key):
print 'server: __init__()'
self.__routing_key = routing_key
def __handle_message(self, body, message):
self.__message_counter += 1
# Print count every 10,000 messsages.
if (self.__message_counter % 10000) == 0:
print self.__message_counter
def start_consuming(self):
print 'server: start_consuming()'
self.__is_consuming = True
exchange = Exchange('raven-exchange', type='topic', durable=False)
queue = Queue(self.__routing_key, exchange, routing_key=self.__routing_key)
with BrokerConnection('amqp://guest:[email protected]:5672//') as connection:
with Consumer(connection, queue, callbacks=[self.__handle_message]) as consumer:
for _ in eventloop(connection):
if self.__is_consuming:
pass
else:
break
consumer.cancel()
connection.close()
def stop_consuming(self):
print 'server: stop_consuming()'
self.__is_consuming = False