我有一個luigi
預處理任務,將我的原始數據拆分爲更小的文件。這些文件將被實際的管道處理。luigi相關性在運行時更改
所以關於參數,我想要求每個管道有一個預處理文件ID作爲參數。但是,此文件ID僅在預處理步驟中生成,因此僅在運行時才知道。爲了說明我的想法,我提供這個不工作的代碼:
import luigi
import subprocess
import random
class GenPipelineFiles(luigi.Task):
input_file = luigi.Parameter()
def requires(self):
pass
def output(self):
for i in range(random.randint(0,10)):
yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))
def run(self):
for iout in self.output:
command = "touch {}".format(iout.fname)
subprocess.call(command, shell=True)
class RunPipelineOnSmallChunk(luigi.Task):
pass
class Experiment(luigi.WrapperTask):
input_file = luigi.Parameter(default="ex")
def requires(self):
file_ids = GenPipelineFiles(input_file=self.input_file)
for file_id in file_ids:
yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)
luigi.run()
的包裝任務Experiment
應該
第一,在某種程度上需要對原始數據的分割成文件
其次,需要使用獲得的預處理文件ID的實際流水線。
在GenPipelineFiles
輸出文件的隨機數表明此不能被硬編碼到Experiment
的requires
。
可能與此問題有關的一個問題是,一個luigi
任務正確地只有一個輸入目標和一個輸出目標。可能關於如何在GenPipelineFiles
中建模多個輸出的說明也可以解決該問題。
你能解釋一下你在這一點上所得到的錯誤? –
luigi依賴關係圖是基於'require'函數的返回而創建的。在這裏,GePipelineFiles永遠不會被返回,因此不會被安排。這段代碼不是我的實際代碼,從來不打算沒有錯誤地運行。它只是爲了說明依賴性問題 - 我面臨的問題 –