我已經啓動了Airflow網絡服務器並安排了一些dag。我可以在Web GUI上看到dag。氣流:如何刪除DAG?
如何從運行中刪除特定的DAG並在Web GUI中顯示?有沒有一個Airflow CLI命令來做到這一點?
我環顧四周,但無法找到一個簡單的方法來刪除DAG,一旦它已被加載和計劃的答案。
我已經啓動了Airflow網絡服務器並安排了一些dag。我可以在Web GUI上看到dag。氣流:如何刪除DAG?
如何從運行中刪除特定的DAG並在Web GUI中顯示?有沒有一個Airflow CLI命令來做到這一點?
我環顧四周,但無法找到一個簡單的方法來刪除DAG,一旦它已被加載和計劃的答案。
Airflow沒有內置的功能可以爲您做到這一點。爲了刪除DAG,請將其從存儲庫中刪除,並刪除Airflow Metastore表中的數據庫條目 - dag。
我還必須重新啓動計劃和網絡服務器所在的機器運行完成清理。簡單地重新啓動Web服務器和調度程序是不夠的。 –
我剛剛寫了一個腳本,刪除與特定dag相關的所有內容,但這僅適用於MySQL。如果您使用PostgreSQL,則可以編寫不同的連接器方法。最初由蘭斯發佈的命令是https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 我只是把它放在腳本中。希望這可以幫助。格式:蟒蛇script.py dag_id
import sys
import MySQLdb
dag_input = sys.argv[1]
query = {'delete from xcom where dag_id = "' + dag_input + '"',
'delete from task_instance where dag_id = "' + dag_input + '"',
'delete from sla_miss where dag_id = "' + dag_input + '"',
'delete from log where dag_id = "' + dag_input + '"',
'delete from job where dag_id = "' + dag_input + '"',
'delete from dag_run where dag_id = "' + dag_input + '"',
'delete from dag where dag_id = "' + dag_input + '"' }
def connect(query):
db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
cur = db.cursor()
cur.execute(query)
db.commit()
db.close()
return
for value in query:
print value
connect(value)
不知道爲什麼Apache的氣流沒有明顯的和簡單的方法來刪除DAG
這是PR公開,但尚未合併。對於那些感興趣的鏈接 - https://github.com/apache/incubator-airflow/pull/2199。 –
這是使用PostgresHook我適應代碼默認的connection_id。
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook(postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id='{}'".format(t, dag_input)
hook.run(sql, True)
我認爲你也可以將'task_fail'和'dag_stats'添加到表 – marengaz
我已經編寫了一個腳本,用於刪除與默認SQLite數據庫相關的特定dag的所有元數據。這是基於耶穌的回答,但是從Postgres改編爲SQLite。用戶應將../airflow.db
設置爲相對於默認airflow.db文件(通常爲~/airflow
)存儲script.py的任何位置。要執行,請使用python script.py dag_id
。
import sqlite3
import sys
conn = sqlite3.connect('../airflow.db')
c = conn.cursor()
dag_input = sys.argv[1]
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
query = "delete from {} where dag_id='{}'".format(t, dag_input)
c.execute(query)
conn.commit()
conn.close()
這個表的列表中,這是一個很好的解決方案,至少在PR合併之前是這樣的 –
您可以清除一組任務實例,就好像他們從來沒有跑:
airflow clear dag_id -s 2017-1-23 -e 2017-8-31
然後從DAG的文件夾
這可能會導致'dag'表中有一些未清理的數據 – Chengzhi
沒有CLI這個刪除DAG文件。但是,如果您想要嘗試恢復它,那麼就會放棄pull請求:https://github.com/apache/incubator-airflow/pull/1344 – TheF1rstPancake