我有一個長時間運行的步驟(通常大約2小時,但它根據運行的單元而變化)的氣流中的SubDAG。在1.7.1.3下,這一步將始終導致AIRFLOW-736,並且當所有步驟都成功時,SubDAG將在'運行'狀態中停頓。我們可以解決這個問題,因爲我們沒有在SubDAG之後執行步驟,只需在數據庫中手動將SubDagOperator標記爲成功(而不是運行)即可。氣流 - SubDag中長時間運行的任務在一小時後標記爲失敗
- 下張樹庭我們的調度和工人
- 通過畫中畫,卸載氣流和安裝Apache的氣流(版本1.8.1:
我們現在會通過執行以下操作測試氣流1.8.1,升級)
- 辦刊氣流UPGRADEDB
- 運行氣流調度和工人
隨着系O在沒有觸動的情況下,相同的DAG在長時間運行的任務達到1小時標記後大致100%的時間內失敗(儘管奇怪的是,不是完全3600秒後 - 小時刻度之後可以是30到90秒之間的任何時間)消息「Executor報告任務實例已完成(失敗),儘管任務說它正在運行。 「但是,任務本身仍然繼續在工作人員身上運行,不管怎樣,在調度人員認爲任務失敗(jobs.py的this line)錯誤的基礎上,調度人員之間存在分歧,儘管有實際的任務運行良好
我已經證實,在某種程度上,狀態是'失敗'在氣流數據庫的task_instance表中,因此,我想知道什麼可以設置任務狀態失敗,當任務本身仍在運行
下面是一個示例DAG觸發問題:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
)
return dag
def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub
long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
今天,我遇到了同樣的問題,一個長時間運行任務的Subdag,稍微過了一個多小時,我說錯誤信息。有趣的是,調度程序試圖重新啓動任務,由於資源超出氣流阻塞而失敗。原始任務繼續運行,並正確結束,在任務結束之前,氣流將該標籤標記爲失敗。 –
你在使用什麼執行器。它是芹菜+ Redis? –