我使用py-amqplib來訪問Python中的RabbitMQ。應用程序會不時收到關於某些MQ主題的請求。如何使用py-amqplib等待多個隊列中的消息
它接收到這樣的請求時,它創建一個AMQP連接和一個信道,並開始一個新的線程來監聽消息的第一次:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener非常簡單:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
創建連接後,訂閱感興趣的主題,如下所示:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
第一次這一切正常。但是,它在後續請求訂閱另一個主題時失敗。在隨後的請求中,我重新使用AMQP連接和AMQPListener線程(因爲我不想爲每個主題啓動一個新線程),並且當我調用方法調用永遠不會返回的代碼塊上方的代碼塊時。我也試過在那個時候創建一個新的頻道,並且connection.channel()調用永遠不會返回。
我一直能夠使它工作的唯一方法是爲每個主題創建一個新的連接,通道和偵聽器線程(即routing_key),但這實際上並不理想。我懷疑這是wait()方法,它以某種方式阻止了整個連接,但我不知道該怎麼做。當然,我應該能夠使用單個監聽器線程接收帶有多個路由鍵(甚至在多個通道上)的消息?
一個相關的問題是:當該主題不再感興趣時,我該如何停止監聽線程?如果沒有消息,channel.wait()調用將永遠阻塞。我能想到的唯一方法是向隊列發送一條虛擬消息,以「毒化」它,即。被聽衆解釋爲停止的信號。
謝謝,但問題是我可能需要在任何時間訂閱新的主題,即。 ''channel.wait()'被調用後。 – EMP 2010-01-07 22:43:46