2016-07-25 71 views
6

將參數傳遞到氣流中的相關任務的方式是什麼?我有很多bashes文件,我試圖將這種方法遷移到氣流,但我不知道如何在任務之間傳遞一些屬性。將氣流參數傳遞給相關任務

這是一個真實的例子:

#sqoop bash template 
sqoop_template = """ 
     sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ 
    """ 

s3_template = """ 
     s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} 
    """ 



#Task of extraction in EMR 
t1 = BashOperator(
     task_id='extract_account', 
     bash_command=sqoop_template, 
     params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, 
     dag=dag) 
#Task to upload in s3 backup. 
t2 = BashOperator(
     task_id='s3_upload', 
     bash_command=s3_template, 
     params={}, #here i need the dir name created in t1 
     depends_on_past=True 
    ) 

t2.set_upstream(t1) 

在T2我需要訪問t1中創建的目錄名。

解決方案

#Execute a valid job sqoop 
def sqoop_import(table_name, job_name): 
    s3, hdfs = dirpath(table_name) 
    sqoop_job = job_default_config(job_name, hdfs) 
    #call(sqoop_job) 
    return {'hdfs_dir': hdfs, 's3_dir': s3} 

def s3_upload(**context): 
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] 
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] 
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] 
    #call(s3_cpdist_job) 
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') 

def sns_notify(**context): 
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] 
    client = boto3.client('sns') 
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' 
    response = client.publish(TargetArn=arn, Message=s3) 
    return response 

這不是是明確的解決方案,所以改善是值得歡迎的。謝謝。

+0

在我看來,一種解決方案是用t1中創建的屬性創建一個文件,並在t2中使用該文件。 –

回答