2016-12-30 56 views
4

我在使用RabbitMQ頻道的消息,我希望一次可以消費n個元素。我想我可以使用ProcessPoolExecutor(或ThreadPoolExecutor)。 我只是想知道是否有可能知道是否有在池中自由執行。使用concurrent.futures一次消費很多出隊消息

這是我想寫什麼:

executor = futures.ProcessPoolExecutor(max_workers=5) 
running = [] 
def consume(message): 
    print "actually consuming a single message" 

def on_message(channel, method_frame, header_frame, message): 
    # this method is called once per incoming message 
    future = executor.submit(consume, message) 
    block_until_a_free_worker(executor, future) 

def block_until_a_free_worker(executor, future): 
    running.append(future) # this grows forever! 
    futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED) 

[...] 
channel.basic_consume(on_message, 'my_queue') 
channel.start_consuming() 

我需要寫函數block_until_a_free_worker。 這種方法應該能夠檢查所有正在運行的工作人員是否正在使用。

另外我可以使用任何阻止executor.submit選項,如果可用。

我嘗試了不同的方法,改變期貨與此同時,他們完成的列表。 我想明確地從列表中添加和刪除期貨,然後等待這樣的:

futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED) 

看來這不是一個解決方案。

我可以設置一個future.add_done_callback和possibily算上運行實例...

任何提示或想法? 謝謝。

+0

可能是一個解決方案,基於multiprocessing.Pool和信號量,用工人的數量初始化:http://stackoverflow.com/questions/9601802/python-pool-apply-async-and-map- async-do-not-block-on-full-queue – Zen

回答

2

我給了一個類似的答案here

信號量用於限制對一組工作人員的資源訪問。

from threading import Semaphore 
from concurrent.futures import ProcessPoolExecutor 

class TaskManager: 
    def __init__(self, workers): 
     self.pool = ProcessPoolExecutor(max_workers=workers) 
     self.workers = Semaphore(workers) 

    def new_task(self, function): 
     """Start a new task, blocks if all workers are busy.""" 
     self.workers.acquire() # flag a worker as busy 

     future = self.pool.submit(function, ...) 

     future.add_task_done(self.task_done) 

    def task_done(self, future): 
     """Called once task is done, releases one worker.""" 
     self.workers.release() 
+0

typo:def TaskManager - > class TaskManager。 – Zen

+0

固定...謝謝 – noxdafox