2017-04-04 40 views
0

我到路易吉框架的發展,我想在一個類來執行2·喬布斯(兩者都是流水線作業),但在某種程度上是作業2只有當Job1完全執行時才能運行。路易吉 - 執行2個流水線作業,(必須同步,不paralell)

class ExecuteTwoJobs(luigi.Task): 
    def requires(self): 
     reqs = [] 
     reqs.append(Job1(*args, **kwargs)) 
     reqs.append(Job2(*args, **kwargs)) 
     return reqs 

    def output(self): 
     //statements 

    def run(self): 
     //statements 

有什麼辦法可以執行job1,一旦它完成,然後去執行Job2。

任何幫助深表感謝

回答

1

做到這一點的方式運行作業2,由作業2要求的作業1。

class Job2(luigi.Task): 

    def requires(self): 
     yield Job1(*args, **kwargs)) 

    def output(self): 
     //statements 

    def run(self): 
     //statements 

然後運行作業2這樣:

luigi --module <your_module> Job2 <tasks params> 

和Luigi將首先運行作業1,並在完成後,它會運行作業2。

+0

感謝提供解決方案,是的,我們確實必須這樣做在你的方式,但我想是在一個類來執行它們並不能使被作業2要求的作業1。 說作業3既需要作業1和作業2,所以在這種情況下,一旦作業3執行:作業1和作業2的流水線, 有什麼辦法:我可以讓作業1作業2上手,甚至之前執行。 預先感謝 –

+0

@TalatParwez我覺得你不理解路易吉是如何工作的噢,我不能夠理解你。跑我張貼解決任務的方式是'路易--module your_module作業2 <任務參數>':這樣路易吉將檢查作業1已alredy被運行,如果沒有,它會執行它。 Job1完成後,它將運行Job2。請注意,您從未告訴Luigi運行Job1(從命令行),您只是要求它運行Job2。 – matagus

1

在普通情況下@matagus的答案是建議的,但如果由於某些特殊原因無法制作Job2所需的Job1,您可以按如下方式使用dynamic dependency

class ExecuteTwoJobs(luigi.ExternalTask): 

    def output(self): 
     //statements 

    def run(self): 
     yield Job1(*args, **kwargs) 
     yield Job2(*args, **kwargs) 

在執行這個任務,如果沒有完成Job1被執行,Job2之後運行。不過,我們可以從DAG工作流程管理中獲得更少的好處。