apache-airflow

    2熱度

    3回答

    我需要引用由BashOperator返回的變量。我可能做錯了,請原諒我。在我的task_archive_s3_file中,我需要從get_s3_file獲取文件名。該任務僅將{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}作爲字符串打印而不是值。 如果我使用bash_command,則打印值正確。 get_s3_file = PythonOpera

    0熱度

    1回答

    我試圖找出Airflow如何在多租戶環境中工作。具體來說,要求應該如下所示: TeamA和TeamB兩個團隊正在使用單個Airflow實例。 團隊的A和B每個都有自己的服務用戶帳戶:serviceUserA和ServiceUserB他們應該運行他們的工作。 出於安全原因,團隊A不應該能夠創建在ServiceUserB下運行的作業,反之亦然。 在這一點上我不清楚需求3)是否可以用Airflow來滿足

    1熱度

    2回答

    我想觸發simplehttpoperator,像這樣: 氣流trigger_dag test_trigger --conf '{ 「名」: 「東西」}' 然後我用pythonoperator python_callable使用kwargs [ 'dag_run' 接受的參數] .conf,我想將['dag_run']。conf傳遞給simplehttpoperator,我該怎麼做?任何人都可以幫忙

    1熱度

    1回答

    我在使用LocalScheduler選項的EC2實例上使用氣流。我已經調用airflow scheduler和airflow webserver,一切似乎都運行良好。也就是說,在將cron字符串提供給schedule_interval用於「每10分鐘執行一次」'*/10 * * * *'後,該作業默認每24小時繼續執行一次。下面的代碼頭: from datetime import datetime

    3熱度

    1回答

    我已經搜索了很多鏈接,但沒有找到解決我遇到的問題的任何方法。我已經看到了將密鑰/ var傳遞給airflow UI的選項,但是最終用戶使用哪個密鑰與哪個dag關聯是非常令人困惑的。有沒有什麼辦法可以實現如下功能: While running an airflow job, end user will be asked for values to some parameters and after

    2熱度

    2回答

    使用最新版本的apache airflow。從LocalExecutor開始,在該模式下,一切工作正常,除了Web UI狀態需要使用CeleryExecutor的一些交互。使用Redis安裝和配置Celery執行程序,將Redis配置爲代理程序URL和結果後端。 它出現在第一個工作,直到任務計劃此時它提供了以下錯誤: File "/bin/airflow", line 28, in <module

    0熱度

    1回答

    是否有任何直接的方法可以將shell腳本運行到dataproc集羣中。目前我可以通過pysparkoperator(它調用aonther python文件,然後這個python文件調用shell腳本)運行shell。我搜查了很多鏈接,但至今沒有找到任何直接的方式。 如果有人能告訴我最簡單的方法,對我來說真的很有幫助。與sh運營商[1]

    1熱度

    1回答

    我試圖用氣流腳本來運行存在於雲存儲HQL文件,有兩個參數,通過它我們可以通過DataprocHiveOperator路徑: 查詢: 'GS://bucketpath/filename.q' Error occuring - cannot recognize input near 'gs' ':' '/' query_uri: 'GS://bucketpath/filename.q' Error o

    0熱度

    1回答

    試圖促成氣流,但不能在Ubuntu 16.0.4上運行並運行。一些事情正在與kerbos進行。 https://github.com/apache/incubator-airflow py34-hdp-airflow_backend_postgres runtests: commands[2] | sudo /home/dalupus/incubator-airflow/scripts/ci/se

    0熱度

    1回答

    我有一個Airflow操作員在第三方服務上踢任務,然後監視該作業的進度。在代碼中,如果氣流工人重新啓動(通常是由於代碼部署)的執行貌似 def execute(self, context): external_id = start_external_job() wait_until_external_job_completes(external_id) 這個任務的一個實例運