2009-06-11 117 views
2

我試圖將系統從使用病態移動到rabbitmq,但似乎無法獲得默認情況下提供的相同廣播行爲morbid。通過廣播我的意思是,當消息被添加到隊列中時,每個消費者都會收到它。使用兔子時,當添加消息時,它們以循環方式分配給每個聽衆。用Rabbitmq和Python重播廣播

誰能告訴我如何實現同樣的消息分佈?

下面使用的蹬地庫是http://code.google.com/p/stomppy/

如果不能夠做到與跺腳,甚至amqplib例如將真正幫助。

我目前的代碼如下所示

消費者

import stomp 

class MyListener(object): 
    def on_error(self, headers, message): 
     print 'recieved an error %s' % message 

    def on_message(self, headers, message): 
     print 'recieved a message %s' % message 

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password') 
conn.set_listener('', MyListener()) 
conn.start() 
conn.connect(username="user", password="password") 
headers = {} 

conn.subscribe(destination='/topic/demoqueue', ack='auto') 

while True: 
    pass 
conn.disconnect() 

和發件人看起來像這樣

import stomp 

class MyListener(object): 
    def on_error(self, headers, message): 
     print 'recieved an error %s' % message 

    def on_message(self, headers, message): 
     print 'recieved a message %s' % message 

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password') 
conn.set_listener('', MyListener()) 
conn.start() 
conn.connect(username="user", password="password") 
headers = {} 

conn.subscribe(destination='/topic/demotopic', ack='auto') 

while True: 
    pass 
conn.disconnect() 

回答

3

我終於想通過爲每個「接收組」創建一個交換來做到這一點,我不知道兔子會如何處理數千個交換,所以你可能想要在生產中試用它之前仔細測試它

在發送代碼:

conn.send(str(i), exchange=exchange, destination='') 

是必需的空白目標,我所關心的是發送到交換

要收到

import stomp 
import sys 
from amqplib import client_0_8 as amqp 
#read in the exchange name so I can set up multiple recievers for different exchanges to tset 
exchange = sys.argv[1] 
conn = amqp.Connection(host="localhost:5672", userid="username", password="password", 
virtual_host="/", insist=False) 

chan = conn.channel() 

chan.access_request('/', active=True, write=True, read=True) 

#declare my exchange 
chan.exchange_declare(exchange, 'topic') 
#not passing a queue name means I get a new unique one back 
qname,_,_ = chan.queue_declare() 
#bind the queue to the exchange 
chan.queue_bind(qname, exchange=exchange) 

class MyListener(object): 
    def on_error(self, headers, message): 
     print 'recieved an error %s' % message 

    def on_message(self, headers, message): 
     print 'recieved a message %s' % message 

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser') 
conn.set_listener('', MyListener()) 
conn.start() 
conn.connect(username="username", password="password") 
headers = {} 

#subscribe to the queue 
conn.subscribe(destination=qname, ack='auto') 

while True: 
    pass 
conn.disconnect() 
3

顯然,你不能直接與STOMP做;有一個mailing list thread,它顯示了你必須跳過的所有喧囂才能進行廣播(包括一些較低級別的AMPQ內容)。

+0

謝謝,我已經看過線程,並試圖用amqplib實現它的建議沒有成功。觸及它的具體信息是http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2008-September/001786.html。我已經更新了這個問題,以反映一個amqplib樣本會爲我完成這項工作。 – 2009-06-11 15:44:28