2017-06-28 51 views
0

問題:我有一個DAG(Directed-acyclic-graph)結構,用於啓動機器上某些海量數據處理的執行。有些進程只能在父數據處理完成時才能啓動,因爲存在多級處理。我想使用python多處理庫來處理它的一臺機器上的所有內容,作爲第一個目標,然後使用管理器在不同的機器上執行。我沒有以前的python多處理經驗。任何人都可以建議,如果這是一個好的圖書館開始?如果是的話,一些基本的實施想法會很好。如果沒有,還有什麼可以用來在Python中做這件事情?使用python多重處理在異步中啓動大量依賴進程

實施例:

A - >乙

乙 - > d,E,F,G

Ç - > d

在上述例子中我想踢甲& C首先(並行),在成功執行之後,其他剩餘的進程只會等待B先完成。一旦B完成執行,所有其他進程將開始。

P.S .:對不起,我不能分享實際數據,因爲機密,但我試圖使用該示例來清除。

回答

1

我很喜歡使用進程和隊列這樣的東西。

像這樣:

from multiprocessing import Process, Queue 
from Queue import Empty as QueueEmpty 
import time 

#example process functions 
def processA(queueA, queueB): 
    while True: 
     try: 
      data = queueA.get_nowait() 
      if data == 'END': 
       break 
     except QueueEmpty: 
      time.sleep(2) #wait some time for data to enter queue 
      continue 
     #do stuff with data 
     queueB.put(data) 

def processA(queueB, _): 
    while True: 
     try: 
      data = queueB.get_nowait() 
      if data == 'END': 
       break 
     except QueueEmpty: 
      time.sleep(2) #wait some time for data to enter queue 
      continue 
     #do stuff with data 

#helper functions for starting and stopping processes 
def start_procs(num_workers, target_function, args): 
    procs = [] 
    for _ in range(num_workers): 
     p = Process(target=target_function, args=args) 
     p.start() 
     procs.append(p) 
    return procs 

def shutdown_process(proc_lst, queue): 
    for _ in proc_lst: 
     queue.put('END') 
    for p in proc_lst: 
     try: 
      p.join() 
     except KeyboardInterrupt: 
      break 

queueA = Queue(<size of queue> * 3) #needs to be a bit bigger than actual. 3x works well for me 
queueB = Queue(<size of queue>) 
queueC = Queue(<size of queue>) 
queueD = Queue(<size of queue>) 

procsA = start_procs(number_of_workers, processA, (queueA, queueB)) 
procsB = start_procs(number_of_workers, processB, (queueB, None)) 

# feed some data to processA 
[queueA.put(data) for data in start_data] 

#shutdown processes 
shutdown_process(procsA, queueA) 
shutdown_process(procsB, queueB) 

#etc, etc. You could arrange the start, stop, and data feed statements to arrive at the dag behaviour you desire 
+0

這是一個很好的例子。但考慮到數據可能具有的依賴性數量,擁有Queue是否是一個好主意?我認爲,一羣工人應該更好,有什麼建議? –

+0

只要你不關心併發...... – Sebastian

+0

在DAG的情況下,一個進程可以有多個父進程。這讓我懷疑爲每個進程維護單獨隊列的有效性。它可能無法正常工作。有時候,這個計劃將不得不等待所有的家長完成一個。 –

相關問題