2017-06-24 67 views
1

我有一個dag檢查FTP服務器上的文件(氣流運行在不同的服務器上)。如果存在文件,則文件被移到S3(我們在這裏存檔)。從那裏,文件名被傳遞給Spark提交作業。火花作業將通過S3(不同服務器上的Spark集羣)處理文件。我不確定是否需要有多個dag,但這裏是流程。我想要做的只是在S3存儲桶中存在文件時才運行Spark作業。如何在氣流中途成功退出任務?

我嘗試使用S3傳感器,但它符合超時標準後失敗/超時,因此整個DAG設置爲失敗。

check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done 

我只想要做的FTP檢查,只有當一個或多個文件移入S3腳本後運行一切。

回答

1

您可以有2個不同的DAG。一個只有S3傳感器,並且每5分鐘一直運行。如果它找到該文件,它會觸發第二個DAG。第二個DAG將文件提交給S3並進行歸檔(如果完成)。您可以在第一個DAG中使用TriggerDagRunOperator進行觸發。

+0

如果沒有找到文件,會不會退出w /錯誤代碼會發生什麼?因此有人需要重新開展工作,不是嗎? – luckytaxi

+0

第一個DAG(有兩個任務,S3Sensor和TriggerDagRunOperator)可以計劃每五分鐘運行一次。這意味着傳感器每5分鐘運行一次,如果發現文件,則會觸發第二個DAG。否則,它什麼都不做,5分鐘後重播。如果它以一個錯誤代碼退出並不重要(您不應將第一個DAG的depends_on_past設置爲true)。 – Him

0

他給出的答案將起作用。 另一種選擇是使用傳感器具有的「soft_fail」參數(它是來自BaseSensorOperator的參數)。如果您將此參數設置爲True,而不是失敗任務,則它將跳過它,並且分支中的所有後續任務也將被跳過。

有關更多信息,請參閱airflow code

相關問題