2012-12-20 41 views
1

我使用rabbitmq作爲amqp服務器(clustering)和kombu + py-amqp作爲amqp client.i可以發送正常消息隊列的/ recv消息,但我不知道如何爲鏡像隊列發送/ recv消息,並且我可以沒有找到答案與谷歌。如何發送/ recv鏡像隊列消息?如何在python3中使用amqp鏡像隊列?

我的代碼:

with Connection(hostname='192.168.1.10',userid='test',password='test',virtual_host='test') as conn: 
     simple_queue = conn.SimpleQueue('test') 

,並得到例外

File "./test_amqp.py", line 38, in send 
    simple_queue = conn.SimpleQueue('test') 
    File "/usr/local/lib/python3.2/site-packages/kombu/connection.py", line 671, in SimpleQueue 
    exchange_opts, **kwargs) 
    File "/usr/local/lib/python3.2/site-packages/kombu/simple.py", line 122, in __init__ 
    consumer = messaging.Consumer(channel, queue) 
    File "/usr/local/lib/python3.2/site-packages/kombu/messaging.py", line 338, in __init__ 
    self.revive(self.channel) 
    File "/usr/local/lib/python3.2/site-packages/kombu/messaging.py", line 350, in revive 
    self.declare() 
    File "/usr/local/lib/python3.2/site-packages/kombu/messaging.py", line 360, in declare 
    queue.declare() 
    File "/usr/local/lib/python3.2/site-packages/kombu/entity.py", line 471, in declare 
    self.queue_declare(nowait, passive=False) 
    File "/usr/local/lib/python3.2/site-packages/kombu/entity.py", line 497, in queue_declare 
    nowait=nowait) 
    File "/usr/local/lib/python3.2/site-packages/amqp/channel.py", line 1240, in queue_declare 
    (50, 11), # Channel.queue_declare_ok 
    File "/usr/local/lib/python3.2/site-packages/amqp/abstract_channel.py", line 70, in wait 
    return self.dispatch_method(method_sig, args, content) 
    File "/usr/local/lib/python3.2/site-packages/amqp/abstract_channel.py", line 88, in dispatch_method 
    return amqp_method(self, args) 
    File "/usr/local/lib/python3.2/site-packages/amqp/channel.py", line 222, in _close 
    (class_id, method_id), ChannelError) 
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-ha-policy'for queue 'smarton' in vhost 'smarton': received none but current is the value 'all' of type 'longstr' 

回答

0

也許有一些與kombu prolem,參數:{「x-ha-policy」:「all」}應該發佈到entity.Queue.queue_arguments,但是threre沒有entity.Queue方法設置值entity.Queue.queue_arguments的,我改變了kombu.simple.SimpleQueue,並得到正確的結果:

112   if not isinstance(queue, entity.Queue): 
113    exchange = entity.Exchange(name, 'direct', **exchange_opts) 
114    queue = entity.Queue(name, exchange, name, **queue_opts) 
115+   queue.queue_arguments={'x-ha-policy':'all'} 
116   else: 
117    name = queue.name 
118    exchange = queue.exchange 
0

當你聲明一個隊列,所有的選項都必須等同於已經存在服務器上的一個。在這種情況下,服務器的額外選項是'x-ha-policy': 'all'

嘗試simple_queue = conn.SimpleQueue('test', queue_opts={"x-ha-policy": "all"})

我沒有測試過這一點,但我認爲它會工作。

+0

我知道這一點,我找不到在哪裏把{「X-HA-政策」:「所有」 } – xielingyun

+0

queue_opts = {「x-ha-policy」:「all」}不起作用 – xielingyun