1

我正在使用ruffus編寫管道。我有一個並行調用的函數,它會創建多個文件。我想創建一個函數「combineFiles()」,在完成所有這些文件後調用它。由於它們在羣集上並行運行,它們不會一起完成。我寫了一個函數'getFilenames()',它返回需要創建的一組文件名,但我怎麼才能讓combineFiles()等待它們在那裏呢?要求在Ruffus管道中運行函數之前創建一組文件

我試過如下:

@pipelineFunction 
@files(getFilenames) 
def combineFiles(filenames): 
    # I should only be called if every file in the list 'filenames' exists 

我也試過裝飾:

@merge(getFilenames) 

但這也不管用。在由getFilenames給出的文件被創建之前,combineFiles仍然會被錯誤地調用。如何使那些文件在那裏的combineFiles有條件?

謝謝。

回答

2

我是Ruffus的開發者。我不確定我完全理解你正在嘗試做什麼,但這裏有:

等待工作需要完成不同的時間才能完成管道的下一個階段,這正是Ruffus所做的工作這有希望是直截了當的。

第一個問題是你知道哪些文件是在前面創建的,即在管道運行之前?讓我們先假設你做。

from ruffus import * 
filenames = ["one.file", "two.file", "three.file"] 

讓我們編寫一個虛擬函數,它在每次調用時創建一個文件。在Ruffus中,任何輸入和輸出文件名都分別包含在前兩個參數中。我們沒有輸入文件名,所以我們的函數調用應該是這樣的:

create_file(None, "one.file") 
create_file(None, "two.file") 
create_file(None, "three.file") 

create_file的定義是這樣的:

@files([(None, fn) for fn in filenames]) 
def create_file(no_input_file_name, output_file_name): 
    open(output_file_name, "w").write("dummy file") 

這些文件中的每一個將在3次獨立的呼叫創建到create_file。如果你願意,這些可以並行運行。

pipeline_run([create_file], multiprocess = 5) 

現在來合併文件。 「@Merge」裝飾器確實是爲此設置的。我們只需要它鏈接到以前的功能:

@merge(create_file, "merge.file") 
def merge_file(input_file_names, output_file_name): 
    output_file = open(output_file_name, "w") 
    for i in input_file_names: 
     output_file.write(open(i).read()) 

這隻會調用merge_file當所有的文件都是從三個電話準備create_file()。

整個代碼如下:

from ruffus import * 
filenames = ["one.file", "two.file", "three.file"] 

from random import randint 
from time import sleep 

@files([(None, fn) for fn in filenames]) 
def create_file(no_input_file_name, output_file_name): 
    # simulate create file process of indeterminate complexity 
    sleep(randint(1,5)) 
    open(output_file_name, "w").write("dummy file") 

@merge(create_file, "merge.file") 
def merge_file(input_file_names, output_file_name): 
    output_file = open(output_file_name, "w") 
    for i in input_file_names: 
     output_file.write(open(i).read()) 


pipeline_run([merge_file], multiprocess = 5) 

而這是結果:

>>> pipeline_run([merge_file], multiprocess = 5) 

    Job = [None -> two.file] completed 
    Job = [None -> three.file] completed 
    Job = [None -> one.file] completed 
Completed Task = create_file 
    Job = [[one.file, three.file, two.file] -> merge.file] completed 
Completed Task = merge_file 
相關問題