2012-06-15 24 views
3

我需要在交換機上設置多個隊列。我想創建一個連接,然後聲明多個隊列(這是有效的),然後在多個隊列上發佈消息(這是行不通的)。Pika basic_publish在多個隊列上發佈時會掛起

我設置了一些測試代碼來做到這一點,但它每次都會在第二次發佈時掛起。我認爲它不喜歡在沒有關閉連接的情況下在多個隊列上發佈,因爲當我在單個隊列上發佈時(甚至在單個隊列上發送多個消息),此代碼可以正常工作。

有什麼我需要添加使這項工作?我真的很想不必關閉發佈之間的聯繫。另外,當我有消費者時,當我在發送多個隊列時發送給basic_publish()的時候,他們什麼都看不到。當我在單個隊列上發佈時,我確實看到消息幾乎立即出現。

#!/usr/bin/env python 
import pika 


queue_names = ['1a', '2b', '3c', '4d'] 


# Variables to hold our connection and channel 
connection = None 
channel = None 


# Called when our connection to RabbitMQ is closed 
def on_closed(frame): 
    global connection 
    # connection.ioloop is blocking, this will stop and exit the app 
    connection.ioloop.stop() 



def on_connected(connection): 
    """ 
    Called when we have connected to RabbitMQ 
    This creates a channel on the connection 
    """ 
    global channel #TODO: Test removing this global call 

    connection.add_on_close_callback(on_closed) 

    # Create a channel on our connection passing the on_channel_open callback 
    connection.channel(on_channel_open) 



def on_channel_open(channel_): 
    """ 
    Called when channel opened 
    Declare a queue on the channel 
    """ 
    global channel 

    # Our usable channel has been passed to us, assign it for future use 
    channel = channel_ 


    # Declare a set of queues on this channel 
    for queue_name in reversed(queue_names): 
     channel.queue_declare(queue=queue_name, durable=True, 
           exclusive=False, auto_delete=False, 
           callback=on_queue_declared) 
     #print "done making hash" 

def on_queue_declared(frame): 
    """ 
    Called when a queue is declared 
    """ 
    global channel 

    print "Sending 'Hello World!' on ", frame.method.queue 

    # Send a message 
    channel.basic_publish(exchange='', 
          routing_key=frame.method.queue, 
          body='Hello World!') 


# Create our connection parameters and connect to RabbitMQ 
connection = pika.SelectConnection(pika.ConnectionParameters('localhost'), \ 
            on_connected) 

# Start our IO/Event loop 
try: 
    connection.ioloop.start() 
except KeyboardInterrupt: 
    print "interrupt" 
    # Gracefully close the connection 
    connection.close() 
    # Loop until we're fully closed, will stop on its own 
    #connection.ioloop.start() 

回答

2

我的解決方案是有一個變量跟蹤我的所有隊列是否被聲明。

在on_queue_declared()中,我會檢查這個變量,如果我所有的隊列都被聲明瞭,那麼我開始發佈消息。我相信在返回所有Queue.DeclareOks之前試圖發佈消息導致了這些問題。