2013-09-25 107 views
2

我是Python新手,正在Python 2.7中開發一個應用程序。我正在使用由concurrent.futures庫提供的線程池。一旦來自ThreadPool的線程啓動,它需要等待來自RabbitMQ的一些消息。如何從python中的線程池中喚醒線程?

我該如何在Python中實現這個邏輯才能使該線程從池中等待事件消息?基本上,當我收到來自RabbitMQ的消息時(即等待並通知ThreadPool執行),我需要喚醒一個等待線程。

回答

3

首先定義一個Queue

from Queue import Queue 

q = Queue() 

然後,在你的線程,你嘗試從隊列中取得一個項目:

msg = q.get() 

這將阻止整個線程,直到有東西在隊列中找到。

def on_message(msg): 
    q.put(msg) 
rabbitmq_channel.register_callback(on_message) 

,或者如果你喜歡短:現在

,在同一時間,假設你的到來的事件被觸發回調,你註冊一個回調,簡單地把收到的RabbitMQ的消息隊列中來通知代碼:

rabbitmq_channel.register_callback(lambda msg: q.put(msg)) 

(上面是僞代碼,因爲我還沒有使用RabbitMQ的RabbitMQ的,也不任何Python綁定,但你應該能夠很容易地找出如何片斷適應您的實際應用程序代碼;關鍵部分要注意的是q.put(msg) - 只要確認通知了新消息,就立即調用該部分。)

只要發生這種情況,線程就會被喚醒並可以自由處理消息。爲了重用多個消息在同一個線程,只需使用一個while循環:

while True: 
    msg = q.get() 
    process_message(msg) 

附:我會建議看看Gevent以及如何將它與RabbitMQ結合在您的Python應用程序中,以便能夠擺脫線程並使用更輕量級和可擴展的綠色線程機制,而無需管理線程池(因爲您可以擁有成千上萬的greenlet在飛行中產生並殺死):

# this thing always called in a green thread; forget about pools and queues. 
def on_message(msg): 
    # you're in a green thread now; just process away! 
    benefit_from("all the gevent goodness!") 
    spawn_and_join_10_sub_greenlets() 

rabbitmq_channel.register_callback(lambda msg: gevent.spawn(on_message, msg)) 
+0

另外考慮'threading.Event'是你不需要處理消息。 – Veedrac

+0

@Erik完全同意你的看法。但是,如果池中有超過100個線程,則需要保持相同數量的調用回調方法被調用的隊列對象。我正在考慮採用更優化的方式來處理這種情況,即更多數量的隊列不需要維護。 – Mandy

+0

@Veedrac與線程相同的情況,我猜。我需要保持這麼多的事件。 – Mandy