2013-05-03 55 views
3

我正在研究一個Python應用程序和後臺線程,用於從RabbitMQ隊列(主題方案)消費消息。Python Pika - 消費者進入線程

我在按鈕的on_click事件上啓動線程。 這裏是我的代碼,請關注「#self.receive_command()」。

def on_click_start_call(self,widget): 


    t_msg = threading.Thread(target=self.receive_command) 
    t_msg.start() 
    t_msg.join(0) 
    #self.receive_command() 


def receive_command(self): 

    syslog.syslog("ENTERED") 

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    syslog.syslog("1") 

    channel = connection.channel() 
    syslog.syslog("2") 

    channel.exchange_declare(exchange='STORE_CMD', type='topic') 
    syslog.syslog("3") 

    result = channel.queue_declare(exclusive=True) 
    syslog.syslog("4") 

    queue_name = result.method.queue 
    syslog.syslog("5") 

    def callback_rabbit(ch,method,properties,body): 
     syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n") 

    syslog.syslog("6") 

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key') 
    syslog.syslog("7") 

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True) 
    syslog.syslog("8") 

    channel.start_consuming() 

如果我運行此代碼,我不能在syslog上看到消息1,2,3,5,6,7,8但我只能看到「已輸入」。所以,代碼被鎖定在pika.BlokingConnection上。

如果我運行相同的代碼(註釋線程指令和分解直接調用函數),所有作爲預期和消息都可以正確接收。

有什麼解決方案可以將消費者運行到線程中?

由於提前

達維德

回答

6

我過我的機器上的代碼,與鼠兔的最新版本。它工作正常。 Pika存在線程問題,但只要您爲每個線程創建一個連接,就不會成爲問題。

如果您遇到問題,很可能是因爲舊版Pika中的錯誤,或者與導致問題的線程無關的問題。

我會建議您避免0.9.13,因爲有多個錯誤,但 0.9.14 0.10.0應該很快就會發布。

[編輯] Pika 0.9.14已發佈。

這是我使用的代碼。

def receive_command(): 
    print("ENTERED") 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    print("1") 
    channel = connection.channel() 
    print("2") 
    channel.exchange_declare(exchange='STORE_CMD', type='topic') 
    print("3") 
    result = channel.queue_declare(exclusive=True) 
    print("4") 
    queue_name = result.method.queue 
    print("5") 
    def callback_rabbit(ch,method,properties,body): 
     print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n") 
    print("6") 
    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key') 
    print("7") 
    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True) 
    print("8") 
    channel.start_consuming() 

def start(): 
    t_msg = threading.Thread(target=receive_command) 
    t_msg.start() 
    t_msg.join(0) 
    #self.receive_command() 
start() 
1

另一種方法是通過線程方法channel.start_consuming作爲目標,然後只是通過你的回調consume方法。 用法:consume(callback=your_method, queue=your_queue)

import threading 

def consume(self, *args, **kwargs): 
    if "channel" not in kwargs \ 
      or "callback" not in kwargs \ 
      or "queue" not in kwargs \ 
      or not callable(kwargs["callback"]): 
     return None 

    channel = kwargs["channel"] 
    callback = kwargs["callback"] 
    queue = kwargs["queue"] 
    channel.basic_consume(callback, queue=queue, no_ack=True) 

    t1 = threading.Thread(target=channel.start_consuming) 
    t1.start() 
    t1.join(0)