我有一個dag檢查FTP服務器上的文件(氣流運行在不同的服務器上)。如果存在文件,則文件被移到S3(我們在這裏存檔)。從那裏,文件名被傳遞給Spark提交作業。火花作業將通過S3(不同服務器上的Spark集羣)處理文件。我不確定是否需要有多個dag,但這裏是流程。我想要做的只是在S3存儲桶中存在文件時才運行Spark作業。如何在氣流中途成功退出任務?
我嘗試使用S3傳感器,但它符合超時標準後失敗/超時,因此整個DAG設置爲失敗。
check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done
我只想要做的FTP檢查,只有當一個或多個文件移入S3腳本後運行一切。
如果沒有找到文件,會不會退出w /錯誤代碼會發生什麼?因此有人需要重新開展工作,不是嗎? – luckytaxi
第一個DAG(有兩個任務,S3Sensor和TriggerDagRunOperator)可以計劃每五分鐘運行一次。這意味着傳感器每5分鐘運行一次,如果發現文件,則會觸發第二個DAG。否則,它什麼都不做,5分鐘後重播。如果它以一個錯誤代碼退出並不重要(您不應將第一個DAG的depends_on_past設置爲true)。 – Him