2017-04-19 54 views
10

從氣流文檔:氣流:模式運行的氣流subdag一次

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything 

我明白subdagoperator作爲一個BackfillJob實際實現的,因此,我們必須提供一個schedule_interval給操作者。然而,有沒有辦法獲得一個subdag的schedule_interval="@once"的語義等價物?我擔心,如果我使用設置schedule_interval="@daily"作爲subdag,如果subdag運行時間超過一天,則subdag可能會運行多次。

def subdag_factory(parent_dag_name, child_dag_name, args): 
    subdag = DAG(
     dag_id="{parent_dag_name}.{child_dag_name}".format(
      parent_dag_name=parent_dag_name, child_dag_name=child_dag_name 
     ), 
     schedule_interval="@daily", # <--- this bit here 
     default_args=args 
    ) 

    ... do more stuff to the subdag here 
    return subdag 

TLDR:如何捏造出來

回答

2

「只有一次每父DAG的觸發運行此subdag」嘗試用時間表=無爲subdag外部觸發模式。在這種情況下,它將只在父母觸發時才運行dag

+0

爲了說明起見,你建議使用[TriggerDagRunOperator](https://airflow.incubator.apache.org/code.html?highlight=trigger%20dagrun#airflow.operators.TriggerDagRunOperator)來觸發一個dag而不用時間表?這個subdag的關鍵是我們想要阻塞語義,觸發器dagrun運算符只是觸發一個dagrun,然後繼續前進,不會等到dagrun完成。另外,你不會在氣流UI中獲得一個subdag運行的透明度,你只知道一些隨機的dagrun被觸發。 – gnicholas

4

我發現 [email protected]對我的子標籤工作得很好。也許我的版本已經過時了,但即使所有任務都成功(或跳過)了,我的subdags 也沒有出現問題。

運行相當快樂地生活在我的機器上,現在實際的例子代碼:

subdag_name = ".".join((parent_name,child_name)) 
logging.info(parent_name) 
logging.info(subdag_name) 
dag_subdag = DAG(
    dag_id=subdag_name, 
    default_args=dargs, 
    schedule_interval="@once", 
) 

其實,我最初建幾乎所有我的DAG爲榮耀CFG文件我subdags。不知道經過一些試驗和錯誤之後這個想法有多好,但是時間間隔從來都不是我的阻擋者。

我正在運行一個相對較新的1.8版本,只有很少的定製。我一直在遵循示例DAG建議將我的子標籤保存在dag文件夾內的文件夾中,以便它們不會顯示在DagBag中。

+0

我使用airflow 1.7.1.3和1.8不是ATM選項,因爲該版本意外地破壞了自定義執行器插件。我將看看1.8看看是否可以使用'「@once」'時間表來運行subdags,但如果這是真的,那麼我會感到驚訝,因爲文檔說它不是。 – gnicholas

+0

運氣好嗎?我的代碼仍然快樂地逃跑。 我試圖在1.7中查找規範的方法來爲你做這件事。我能夠找到的最接近的東西(假設'@ once'不可行)設置你的'execution_timeout'爲實際的subdag任務比你在subdag本身設置的執行頻率短。這樣,在你的subdag可以啓動更多任務之前,你會超時。我知道這是猜測,但我不能輕易地在我們的叉子中找到與您所在的叉子一樣古老的氣流。 – apathyman

+1

很想聽到作者爲什麼在文檔明確表示不應該的時候這麼做。 – qwwqwwq