我在使用RabbitMQ頻道的消息,我希望一次可以消費n個元素。我想我可以使用ProcessPoolExecutor(或ThreadPoolExecutor)。 我只是想知道是否有可能知道是否有在池中自由執行。使用concurrent.futures一次消費很多出隊消息
這是我想寫什麼:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
print "actually consuming a single message"
def on_message(channel, method_frame, header_frame, message):
# this method is called once per incoming message
future = executor.submit(consume, message)
block_until_a_free_worker(executor, future)
def block_until_a_free_worker(executor, future):
running.append(future) # this grows forever!
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()
我需要寫函數block_until_a_free_worker。 這種方法應該能夠檢查所有正在運行的工作人員是否正在使用。
另外我可以使用任何阻止executor.submit選項,如果可用。
我嘗試了不同的方法,改變期貨與此同時,他們完成的列表。 我想明確地從列表中添加和刪除期貨,然後等待這樣的:
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
看來這不是一個解決方案。
我可以設置一個future.add_done_callback和possibily算上運行實例...
任何提示或想法? 謝謝。
可能是一個解決方案,基於multiprocessing.Pool和信號量,用工人的數量初始化:http://stackoverflow.com/questions/9601802/python-pool-apply-async-and-map- async-do-not-block-on-full-queue – Zen