2014-02-19 194 views
8

我在Python中使用RabbitMQ來管理生產者和多個消費者之間的幾個隊列。在RabbitMQ網站(routing model)的示例中,消費者被阻止。這意味着它們在start_consuming()處停止,並在每次隊列中有新的「任務」時執行回調函數。RabbitMQ非阻塞消費者

我的問題是:我如何以他仍然在等待任務的方式實現我的客戶(所以,每當有新事物在隊列中時都會調用回調函數),但同時他可以執行其他工作/代碼。

謝謝

+1

爲什麼不在單獨的線程上運行'start_consuming()'? – goncalopp

+1

那麼,我有一個解決方案很容易。我可以簡單地在函數中使用basic_get,並每隔X秒調用一次該函數,而不是使用basic_consume。但是有一個問題:排隊任務是否按照某種順序交付? –

+1

@HoooSousa如果你可以在這裏發佈一個完整的解決方案將是偉大的 - 我是rabbitmq新手,它會真正幫助其他人。 – drozzy

回答

-2

接收機

import pika 

messages = [] 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='message') 

def callback(ch, method, properties, message): 
    print(message) 
    messages.append(message) 

channel.basic_consume(callback,queue='message',no_ack=True) 

channel.basic_consume(callback,queue='message',no_ack=True) 

當你需要) 或線程

import threading 

import pika 
import time 

messages = [] 

def recieve_messages(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(
     'localhost')) 
    channel = connection.channel() 
    channel.queue_declare(queue='hello') 

    def callback(ch, method, properties, body): 
     messages.append(body) 

    channel.basic_consume(callback, 
          queue='hello', 
          no_ack=True) 
    # channel.start_consuming() 
    mq_recieve_thread = threading.Thread(target=channel.start_consuming) 
    mq_recieve_thread.start() 

recieve_messages() 
while True: 
    print messages 
    time.sleep(1) 
+1

Pika不是線程安全的 - API不支持此功能 –