2017-02-21 73 views
0

我有一個luigi預處理任務,將我的原始數據拆分爲更小的文件。這些文件將被實際的管道處理。luigi相關性在運行時更改

所以關於參數,我想要求每個管道有一個預處理文件ID作爲參數。但是,此文件ID僅在預處理步驟中生成,因此僅在運行時才知道。爲了說明我的想法,我提供這個不工作的代碼:

import luigi 
import subprocess 
import random 


class GenPipelineFiles(luigi.Task): 

    input_file = luigi.Parameter() 

    def requires(self): 
     pass 

    def output(self): 

     for i in range(random.randint(0,10)): 
      yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i)) 

    def run(self): 

     for iout in self.output: 
      command = "touch {}".format(iout.fname) 
      subprocess.call(command, shell=True) 


class RunPipelineOnSmallChunk(luigi.Task): 
    pass 


class Experiment(luigi.WrapperTask): 

    input_file = luigi.Parameter(default="ex") 

    def requires(self): 

     file_ids = GenPipelineFiles(input_file=self.input_file) 

     for file_id in file_ids: 
      yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id) 


luigi.run() 

的包裝任務Experiment應該

  1. 第一,在某種程度上需要對原始數據的分割成文件

  2. 其次,需要使用獲得的預處理文件ID的實際流水線。

GenPipelineFiles輸出文件的隨機數表明此不能被硬編碼到Experimentrequires

可能與此問題有關的一個問題是,一個luigi任務正確地只有一個輸入目標和一個輸出目標。可能關於如何在GenPipelineFiles中建模多個輸出的說明也可以解決該問題。

+0

你能解釋一下你在這一點上所得到的錯誤? –

+0

luigi依賴關係圖是基於'require'函數的返回而創建的。在這裏,GePipelineFiles永遠不會被返回,因此不會被安排。這段代碼不是我的實際代碼,從來不打算沒有錯誤地運行。它只是爲了說明依賴性問題 - 我面臨的問題 –

回答

0

處理多個輸出的一種簡單方法是創建一個以輸入文件命名的目錄,並將輸入文件從分割文件放到以輸入文件命名的目錄中。這樣,依賴任務可以檢查目錄是否存在。假設我有一個輸入文件123.txt,然後創建一個包含文件1.txt,2.txt,3.txt的目錄123_split作爲GenPipelineFiles的輸出,然後使用1.txt,2.txt處理目錄123_, 3.txt作爲RunPipelineOnSmallChunk的輸出。

對於您的requires方法Experiment,您必須以列表形式返回您想要運行的任務。你寫的file_ids = GenPipelineFiles(input_file=self.input_file)的方式讓我覺得該對象的run方法沒有被調用,因爲它沒有被方法返回。

下面是一些示例代碼,可以針對每個文件(但不是每個文件的任務)使用目標。我仍然認爲讓一個目錄或者一個哨點文件的單個輸出目標脫離某種類型以表明你已經完成是比較安全的。除非任務確保創建每個目標,否則原子性將丟失。

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler 

sampletask.py

import luigi 
import os 
import subprocess 
import random 


class GenPipelineFiles(luigi.Task): 

    inputfile = luigi.Parameter() 
    num_targets = random.randint(0,10) 

    def requires(self): 
     pass 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "split_{}".format(self.get_prefix()) 

    def output(self): 
     targets = [] 
     for i in range(self.num_targets): 
      targets.append(luigi.LocalTarget(" {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 


class RunPipelineOnSmallChunk(luigi.Task): 

    inputfile = luigi.Parameter(default="test") 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "processed_{}".format(self.get_prefix()) 

    @staticmethod 
    def clean_input_path(path): 
     return path.replace("split", "processed") 

    def requires(self): 
     return GenPipelineFiles(self.inputfile) 

    def output(self): 
     targets = [] 
     for target in self.input(): 
      targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 
+0

感謝您的回答,我認爲在您的提案中有兩件事情不是最優的,但也許我錯誤地理解了您的答案:1.如果'RunPipelineOnSmallChunk'需要整體上述任務的目錄中,RunPipelineOnSmallChunk任務不是原子的,而是必須循環遍歷所有文件才能執行必要的計算。 2。如果我在'Experiment'中需要兩種類型的任務,而這些任務本身需要相互依賴,Luigi調度器會不必要地嘗試多次運行子任務。 –

+0

對於2,我會讓實驗要求'RunPipelineOnSmallChunk'和'RunPipelineOnSmallChunk'需要'GenPipelineFiles'。 – MattMcKnight

+0

在那裏添加了一個修改後的代碼版本 – MattMcKnight