2017-07-25 59 views
1

考慮以下DAG示例,其中第一個任務get_id_creds從數據庫中提取憑證列表。此操作告訴我數據庫中的哪些用戶能夠運行進一步的數據預處理,並將這些ID寫入文件/tmp/ids.txt。然後,我將這些ID掃描到我的DAG中,並使用它們生成可並行運行的upload_transaction任務列表。如何動態迭代上游任務的輸出以在氣流中創建並行任務?

我的問題是:有沒有更習慣性地使用氣流做到這一點的正確動態方法?我在這裏感到笨拙和脆弱。我如何直接將一個有效的ID列表從一個進程傳遞到定義後續下游進程?

from datetime import datetime, timedelta 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

default_args = { 
    'start_date': datetime.now(), 
    'schedule_interval': None 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

get_id_creds = PythonOperator(
    task_id='get_id_creds', 
    python_callable=dash_workers.get_id_creds, 
    provide_context=True, 
    dag=DAG) 

with open('/tmp/ids.txt', 'r') as infile: 
    ids = infile.read().splitlines() 

for uid in uids: 
    upload_transactions = PythonOperator(
     task_id=uid, 
     python_callable=dash_workers.upload_transactions, 
     op_args=[uid], 
     dag=DAG) 
    upload_transactions.set_downstream(get_id_creds) 
+1

檢查https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow –

+0

@JuanRiaza就是這樣,非常感謝。我最終使用的解決方案簡化了代碼,足以將我的解決方案作爲一個單獨的答案發布 - 我在當天結束時沒有任何對'Xcom'的需求 – Aaron

回答

1

考慮到Apache Airflow是一個工作流管理工具,即。它決定了用戶在比較中定義的任務(作爲例子)與作爲數據流管理工具的Apache Nifi之間的依賴關係,這裏的依賴關係是通過任務傳輸的數據。

也就是說,我認爲你的做法是正確的退出(我的意見的基礎上發佈的代碼)氣流提供了一個稱爲XCom概念。它允許任務通過傳遞一些數據在它們之間「交叉通信」。通過的數據應該多大?這取決於你測試!但通常情況下它不應該那麼大。我認爲它是以鍵值對的形式存儲在氣流元數據庫中的,也就是說,例如你不能傳遞文件,但是帶有ID的列表可以工作。

就像我說你應該測試你的自我。我會很高興知道你的經驗。 Here是一個示例dag,它演示了使用XComhere是必要的文檔。乾杯!

+0

感謝您提供的反饋@sdikby。我昨天花了一些時間研究'Xcom',覺得我對於如何在任務之間交換數據有着強烈的概念性把握。在這種情況下,我沒有看到這適用於我正在尋找一種方法,在同一個DAG中的上游任務的輸出上創建任意數量的任務_based_。我想到我可以創建兩個DAG,一個用於定義任務參數,另一個用於執行,但這並不理想,因爲我失去了依賴關係。你是否同意'Xcom'不支持這個應用程序,或者我錯過了somehing? – Aaron

0

Per @Juan Riza的建議我查看了這個鏈接:Proper way to create dynamic workflows in Airflow。這是相當多的答案,但我能簡化的解決方案,以至於我以爲我會提供在這裏實現我自己修改的版本:

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

ENV = os.environ 

default_args = { 
    # 'start_date': datetime.now(), 
    'start_date': datetime(2017, 7, 18) 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

clear_tables = PythonOperator(
    task_id='clear_tables', 
    python_callable=dash_workers.clear_db, 
    dag=DAG) 

def id_worker(uid): 
    return PythonOperator(
     task_id=uid, 
     python_callable=dash_workers.main_preprocess, 
     op_args=[uid], 
     dag=DAG) 

for uid in capone_dash_workers.get_id_creds(): 
    clear_tables >> id_worker(uid) 

clear_tables清理,將被重新構建成數據庫過程的結果。 id_worker是一個基於從get_if_creds返回的ID值數組動態生成新預處理任務的函數。任務ID只是相應的用戶ID,儘管它可能很容易成爲索引i,如上例所述。

注意這位位移運算符(<<)向後在我看來,作爲clear_tables任務應該先,但它是什麼,似乎在這種情況下進行工作。

相關問題