我是Python新手,正在Python 2.7中開發一個應用程序。我正在使用由concurrent.futures
庫提供的線程池。一旦來自ThreadPool
的線程啓動,它需要等待來自RabbitMQ的一些消息。如何從python中的線程池中喚醒線程?
我該如何在Python中實現這個邏輯才能使該線程從池中等待事件消息?基本上,當我收到來自RabbitMQ的消息時(即等待並通知ThreadPool
執行),我需要喚醒一個等待線程。
我是Python新手,正在Python 2.7中開發一個應用程序。我正在使用由concurrent.futures
庫提供的線程池。一旦來自ThreadPool
的線程啓動,它需要等待來自RabbitMQ的一些消息。如何從python中的線程池中喚醒線程?
我該如何在Python中實現這個邏輯才能使該線程從池中等待事件消息?基本上,當我收到來自RabbitMQ的消息時(即等待並通知ThreadPool
執行),我需要喚醒一個等待線程。
首先定義一個Queue
:
from Queue import Queue
q = Queue()
然後,在你的線程,你嘗試從隊列中取得一個項目:
msg = q.get()
這將阻止整個線程,直到有東西在隊列中找到。
def on_message(msg):
q.put(msg)
rabbitmq_channel.register_callback(on_message)
,或者如果你喜歡短:現在
,在同一時間,假設你的到來的事件被觸發回調,你註冊一個回調,簡單地把收到的RabbitMQ的消息隊列中來通知代碼:
rabbitmq_channel.register_callback(lambda msg: q.put(msg))
(上面是僞代碼,因爲我還沒有使用RabbitMQ的RabbitMQ的,也不任何Python綁定,但你應該能夠很容易地找出如何片斷適應您的實際應用程序代碼;關鍵部分要注意的是q.put(msg)
- 只要確認通知了新消息,就立即調用該部分。)
只要發生這種情況,線程就會被喚醒並可以自由處理消息。爲了重用多個消息在同一個線程,只需使用一個while
循環:
while True:
msg = q.get()
process_message(msg)
附:我會建議看看Gevent以及如何將它與RabbitMQ結合在您的Python應用程序中,以便能夠擺脫線程並使用更輕量級和可擴展的綠色線程機制,而無需管理線程池(因爲您可以擁有成千上萬的greenlet在飛行中產生並殺死):
# this thing always called in a green thread; forget about pools and queues.
def on_message(msg):
# you're in a green thread now; just process away!
benefit_from("all the gevent goodness!")
spawn_and_join_10_sub_greenlets()
rabbitmq_channel.register_callback(lambda msg: gevent.spawn(on_message, msg))
另外考慮'threading.Event'是你不需要處理消息。 – Veedrac
@Erik完全同意你的看法。但是,如果池中有超過100個線程,則需要保持相同數量的調用回調方法被調用的隊列對象。我正在考慮採用更優化的方式來處理這種情況,即更多數量的隊列不需要維護。 – Mandy
@Veedrac與線程相同的情況,我猜。我需要保持這麼多的事件。 – Mandy