使用Kombu與RabbitMQ實現經典的發佈/訂閱設計模式。我已經創建,創建一個話題生產者:Kombu能否發佈並滿足多個消費者
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
with Connection('amqp://guest:[email protected]//') as conn:
producer = conn.Producer(serializer='json')
producer.publish('Hello World!',
exchange=media_exchange, routing_key='video',
declare=[video_queue])
我然後創建一個消費者從出版商消耗:
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print(body)
#message.ack()
with Connection('amqp://guest:[email protected]//') as conn:
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
在隨後推出兩款消費者,每一個都在一個單獨的終端;都等待消息:
terminal 1: python consumer.py
terminal 2: python consumer.py
當我運行生產者時,只有一個消費者接收到消息。