2014-07-23 21 views
3

使用Kombu與RabbitMQ實現經典的發佈/訂閱設計模式。我已經創建,創建一個話題生產者:Kombu能否發佈並滿足多個消費者

from kombu import Connection, Exchange, Queue 

media_exchange = Exchange('media', 'topic', durable=False) 
video_queue = Queue('video', exchange=media_exchange, routing_key='video') 

with Connection('amqp://guest:[email protected]//') as conn: 
    producer = conn.Producer(serializer='json') 
    producer.publish('Hello World!', 
         exchange=media_exchange, routing_key='video', 
         declare=[video_queue]) 

我然後創建一個消費者從出版商消耗:

from kombu import Connection, Exchange, Queue 

media_exchange = Exchange('media', type='topic', durable=False) 
video_queue = Queue('video', exchange=media_exchange, routing_key='video') 

def process_media(body, message): 
    print(body) 
    #message.ack() 

with Connection('amqp://guest:[email protected]//') as conn: 
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: 
     # Process messages and handle events on all channels 
     while True: 
      conn.drain_events() 

在隨後推出兩款消費者,每一個都在一個單獨的終端;都等待消息:

terminal 1: python consumer.py 
terminal 2: python consumer.py 

當我運行生產者時,只有一個消費者接收到消息。

回答

3

生產者在交換中發佈,而不是排隊。隊列由消費者定義。當爲每個消費者使用不同的隊列名稱時,所有人都會收到消息。 當爲同一隊列使用許多消費者時,它就是負載均衡,這就是爲什麼只有一個消費者獲得消息。

0

爲了澄清,隊列中的消息是「消耗的」,即第一個消費者消耗它,並且消息不在隊列中,這就是爲什麼第二個消費者沒有得到任何東西。

有2名單獨的消費者對於相同的消息 - 使用2個獨立的隊列即 video_queue1video_queue2,宣告並綁定到交換media_exchange,使用相同的密鑰video

producer.py

from kombu import Connection, Exchange, Queue 

media_exchange = Exchange('media', 'topic', durable=False) 
video_queue1 = Queue('video1', exchange=media_exchange, routing_key='video') 
video_queue2 = Queue('video2', exchange=media_exchange, routing_key='video') 


with Connection('amqp://guest:[email protected]//') as conn: 
    producer = conn.Producer(serializer='json') 
    producer.publish('Hello World!', 
         exchange=media_exchange, routing_key='video', 
         declare=[video_queue1, video_queue2]) 

consumer1.py

from kombu import Connection, Exchange, Queue 

media_exchange = Exchange('media', type='topic', durable=False) 
video_queue = Queue('video1', exchange=media_exchange, routing_key='video') 

def process_media(body, message): 
    print(body) 
    #message.ack() 

with Connection('amqp://guest:[email protected]//') as conn: 
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: 
     # Process messages and handle events on all channels 
     while True: 
      conn.drain_events() 

consumer2.py

from kombu import Connection, Exchange, Queue 

media_exchange = Exchange('media', type='topic', durable=False) 
video_queue = Queue('video2', exchange=media_exchange, routing_key='video') 

def process_media(body, message): 
    print(body) 
    #message.ack() 

with Connection('amqp://guest:[email protected]//') as conn: 
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: 
     # Process messages and handle events on all channels 
     while True: 
      conn.drain_events() 
相關問題