2009-11-27 43 views
3

我使用py-amqplib來訪問Python中的RabbitMQ。應用程序會不時收到關於某些MQ主題的請求。如何使用py-amqplib等待多個隊列中的消息

它接收到這樣的請求時,它創建一個AMQP連接和一個信道,並開始一個新的線程來監聽消息的第一次:

connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False) 
    channel = connection.channel() 

    listener = AMQPListener(channel) 
    listener.start() 

AMQPListener非常簡單:

class AMQPListener(threading.Thread): 
    def __init__(self, channel): 
     threading.Thread.__init__(self) 
     self.__channel = channel 

    def run(self): 
     while True: 
      self.__channel.wait() 

創建連接後,訂閱感興趣的主題,如下所示:

channel.queue_declare(queue = queueName, exclusive = False) 
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True) 
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination) 

def receive_callback(msg): 
    self.queue.put(msg.body) 

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback) 

第一次這一切正常。但是,它在後續請求訂閱另一個主題時失敗。在隨後的請求中,我重新使用AMQP連接和AMQPListener線程(因爲我不想爲每個主題啓動一個新線程),並且當我調用方法調用永遠不會返回的代碼塊上方的代碼塊時。我也試過在那個時候創建​​一個新的頻道,並且connection.channel()調用永遠不會返回。

我一直能夠使它工作的唯一方法是爲每個主題創建一個新的連接,通道和偵聽器線程(即routing_key),但這實際上並不理想。我懷疑這是wait()方法,它以某種方式阻止了整個連接,但我不知道該怎麼做。當然,我應該能夠使用單個監聽器線程接收帶有多個路由鍵(甚至在多個通道上)的消息?

一個相關的問題是:當該主題不再感興趣時,我該如何停止監聽線程?如果沒有消息,channel.wait()調用將永遠阻塞。我能想到的唯一方法是向隊列發送一條虛擬消息,以「毒化」它,即。被聽衆解釋爲停止的信號。

回答

1

如果你想每信道超過一個comsumer只重視使用basic_consume另一個()和使用channel.wait()後。它將監聽通過basic_consume()附加的所有隊列。確保您爲每個basic_consume()定義不同的消費者標籤。

使用channel.basic_cancel(consumer_tag)如果要取消一個特定的消費上的隊列(取消收聽特定主題)。

+0

謝謝,但問題是我可能需要在任何時間訂閱新的主題,即。 ''channel.wait()'被調用後。 – EMP 2010-01-07 22:43:46