2017-10-14 53 views
1

我想設置一些接受輸入並處理它的進程,這個結果的結果是我想要處理的另一個任務。實質上,每個任務都會導致零個或多個新任務(相同類型),最終所有任務都不會產生新任務。當所有進程都試圖從隊列中取出並且隊列爲空時結束處理?

我認爲一個隊列對此很有用,所以我有一個輸入隊列和一個結果隊列來添加任何新的任務。任何時候,隊列可能都是空的,但如果另一個進程正在處理某個任務,則可能會添加更多隊列。

因此,我只希望它結束​​時,所有進程都同時嘗試從輸入隊列中獲取。

我對Python多處理和一般多處理都是全新的。

編輯補充我的意思的基本概述:

class Consumer(Process): 
    def __init__(self, name): 
     super().__init__(name=name) 

    def run(): 
     # This is where I would have the task try to get a new task off of the 
     # queue and then calculate the results and put them into the queue 
     # After which it would then try to get a new task and repeat 

     # If this an all other processes are trying to get and the queue is 
     # empty That is the only time I know that everything is complete and can 
     # continue 
     pass 

def start_processing(): 
    in_queue = Queue() 
    results_queue = Queue() 
    consumers = [Consumer(str(i)) for i in range(cpu_count())] 

    for i in consumers: 
     i.start() 

    # Wait for the above mentioned conditions to be true before continuing 
+0

在這一點上我只有絕對的框架代碼,因爲這看起來像是我需要工作之前,我向前走。基本上我創建了進程和隊列。 –

+0

我已添加一些概述代碼 –

回答

1

JoinableQueue已設計,以適應這一目的。加入JoinableQueue將會阻塞,直到有任務正在進行。

您可以如下使用它:主進程會產生一定數量的工作進程,將其分配給他們JoinableQueue。工作進程將使用隊列來產生和消耗新的任務。主進程將通過加入隊列等待,直到沒有更多的任務正在進行。之後,它將終止工作進程並退出。

非常簡單的例子(僞):

def consumer(queue): 
    for task in queue.get(): 
     results = process_task(task) 

     if 'more_tasks' in results: 
      for new_task in results['more_tasks']: 
       queue.put(new_task) 

     # signal the queue that a task has been completed 
     queue.task_done() 

def main(): 
    queue = JoinableQueue() 

    processes = start_processes(consumer, queue) 

    for task in initial_tasks: 
     queue.put(task) 

    queue.join() # block until all work is done 

    terminate_processes(processes)