2013-02-19 24 views
1

我希望能夠檢測在向AMQP提交消息時交換是否不存在。如何檢測交換不存在與py-amqp

考慮下面的例子。

#!/usr/bin/python 

import amqp 
from time import sleep 

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/") 
outgoing = conn.channel() 
message = amqp.Message("x") 


while True: 
    print "publish message." 
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar") 
    sleep(1) 

該腳本不斷向發佈者發佈,但如果交換不存在,則不會引發任何錯誤。交換存在時,消息到達。

#!/usr/bin/python 

import amqp 
from time import sleep 

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/") 
outgoing = conn.channel() 
message = amqp.Message("x") 


while True: 
    print "publish message." 
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar") 
    outgoing.wait() 
    sleep(1) 

當我添加outgoing.wait()一個amqp.exceptions.NotFound引發這就是我想要的。但問題是,如果在這種情況下交換存在,消息到達,但outgoing.wait()阻止我的循環。 (我可以在單獨的線程中運行outgoing.wait(),但我不想這樣做。)

如何處理此問題?

任何意見建議歡迎指針

感謝,

周杰倫

回答

1

不幸的是,你需要一個阻塞調用檢查來自basic_publish異常()。你可以做什麼,但一旦進入異步循環之前運行阻塞調用:

# send a test message synchronously to see if the exchange exists 
test_message = amqp.Message('exchange_test') 
outgoing.basic_publish(test_message,exchange="non-existing",routing_key="fubar") 
try:  
    outgoing.wait() 
except amqp.exceptions.NotFound: 
    # could not find the exchange, so do something about it 
    exit() 

while True: 
    # fairly certain the exchange exists now, run the async loop 
    print "publish message." 
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar") 
    sleep(1) 
2

如果你想了解是否有交換存在,使用exchange_declare方法和被動標誌設置爲True。將被動標誌設置爲True將防止服務器嘗試創建交換,並在交換不存在時拋出錯誤。

import amqp 
from amqp.exceptions import NotFound 

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", 
         virtual_host="/") 
outgoing = conn.channel() 
try: 
    outgoing.exchange_declare("fubar", "", passive=True) 
except NotFound: 
    print "Exchange 'fubar' does not exist!" 

如果你是在是確保Exchange發佈它之前就存在真正感興趣的,只是你拖放到你發送循環前聲明。如果交換已經存在,則什麼都不會發生。如果交換不存在,它將被創建。

import amqp 

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", 
         virtual_host="/") 
outgoing = conn.channel() 
outgoing.exchange_declare("fubar", "direct") 

這裏是在AMQP庫的方法聲明的鏈接exchange_declare你使用:

https://github.com/celery/py-amqp/blob/master/amqp/channel.py#L460-L461