我很喜歡使用進程和隊列這樣的東西。
像這樣:
from multiprocessing import Process, Queue
from Queue import Empty as QueueEmpty
import time
#example process functions
def processA(queueA, queueB):
while True:
try:
data = queueA.get_nowait()
if data == 'END':
break
except QueueEmpty:
time.sleep(2) #wait some time for data to enter queue
continue
#do stuff with data
queueB.put(data)
def processA(queueB, _):
while True:
try:
data = queueB.get_nowait()
if data == 'END':
break
except QueueEmpty:
time.sleep(2) #wait some time for data to enter queue
continue
#do stuff with data
#helper functions for starting and stopping processes
def start_procs(num_workers, target_function, args):
procs = []
for _ in range(num_workers):
p = Process(target=target_function, args=args)
p.start()
procs.append(p)
return procs
def shutdown_process(proc_lst, queue):
for _ in proc_lst:
queue.put('END')
for p in proc_lst:
try:
p.join()
except KeyboardInterrupt:
break
queueA = Queue(<size of queue> * 3) #needs to be a bit bigger than actual. 3x works well for me
queueB = Queue(<size of queue>)
queueC = Queue(<size of queue>)
queueD = Queue(<size of queue>)
procsA = start_procs(number_of_workers, processA, (queueA, queueB))
procsB = start_procs(number_of_workers, processB, (queueB, None))
# feed some data to processA
[queueA.put(data) for data in start_data]
#shutdown processes
shutdown_process(procsA, queueA)
shutdown_process(procsB, queueB)
#etc, etc. You could arrange the start, stop, and data feed statements to arrive at the dag behaviour you desire
這是一個很好的例子。但考慮到數據可能具有的依賴性數量,擁有Queue是否是一個好主意?我認爲,一羣工人應該更好,有什麼建議? –
只要你不關心併發...... – Sebastian
在DAG的情況下,一個進程可以有多個父進程。這讓我懷疑爲每個進程維護單獨隊列的有效性。它可能無法正常工作。有時候,這個計劃將不得不等待所有的家長完成一個。 –