我是Ruffus的開發者。我不確定我完全理解你正在嘗試做什麼,但這裏有:
等待工作需要完成不同的時間才能完成管道的下一個階段,這正是Ruffus所做的工作這有希望是直截了當的。
第一個問題是你知道哪些文件是在前面創建的,即在管道運行之前?讓我們先假設你做。
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
讓我們編寫一個虛擬函數,它在每次調用時創建一個文件。在Ruffus中,任何輸入和輸出文件名都分別包含在前兩個參數中。我們沒有輸入文件名,所以我們的函數調用應該是這樣的:
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
create_file的定義是這樣的:
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
這些文件中的每一個將在3次獨立的呼叫創建到create_file。如果你願意,這些可以並行運行。
pipeline_run([create_file], multiprocess = 5)
現在來合併文件。 「@Merge」裝飾器確實是爲此設置的。我們只需要它鏈接到以前的功能:
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
這隻會調用merge_file當所有的文件都是從三個電話準備create_file()。
整個代碼如下:
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
而這是結果:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file