我有三個運營商的一個簡單的DAG。第一個是PythonOperator
與我們自己的功能,另外兩個是標準的運營商從airflow.contrib
(FileToGoogleCloudStorageOperator
和GoogleCloudStorageToBigQueryOperator
要準確)。他們按順序工作。我們的自定義任務會生成許多文件,通常在2到5之間,具體取決於參數。所有這些文件都必須由後續任務單獨處理。這意味着我需要幾個下游分支機構,但在運行DAG之前有多少個分支機構是不可知的。如何巢氣流DAG動態?
你會如何解決這個問題?
UPDATE:
使用jhnclvr在他another reply提到的出發點BranchPythonOperator
,我創建了一個運營商,將跳過或繼續執行分支,視情況而定。這種方法是可行的,只是因爲儘可能多的分支已知且足夠小。
操作:
class SkipOperator(PythonOperator):
def execute(self, context):
boolean = super(SkipOperator, self).execute(context)
session = settings.Session()
for task in context['task'].downstream_list:
if boolean is False:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
ti.state = State.SKIPPED
ti.start_date = datetime.now()
ti.end_date = datetime.now()
session.merge(ti)
session.commit()
session.close()
用法:
def check(i, templates_dict=None, **kwargs):
return len(templates_dict["data_list"].split(",")) > i
dag = DAG(
dag_name,
default_args=default_args,
schedule_interval=None
)
load = CustomOperator(
task_id="load_op",
bash_command=' '.join([
'./command.sh'
'--data-list {{ dag_run.conf["data_list"]|join(",") }}'
]),
dag=dag
)
for i in range(0, 5):
condition = SkipOperator(
task_id=f"{dag_name}_condition_{i}",
python_callable=partial(check, i),
provide_context=True,
templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
dag=dag
)
gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i
load_to_gcs = CustomFileToGoogleCloudStorageOperator(
task_id=f"{dag_name}_to_gs_{i}",
src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
bucket=gs_bucket,
dst=gs_filename,
mime_type='application/json',
google_cloud_storage_conn_id=connection_id,
dag=dag
)
load_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id=f"{dag_name}_to_bq_{i}",
bucket=gs_bucket,
source_objects=[gs_filename, ],
source_format='NEWLINE_DELIMITED_JSON',
destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
bigquery_conn_id=connection_id,
schema_fields={},
google_cloud_storage_conn_id=connection_id,
write_disposition='WRITE_TRUNCATE',
dag=dag
)
condition.set_upstream(load)
load_to_gcs.set_upstream(condition)
load_to_bq.set_upstream(load_to_gcs)