氣流

2017-06-27 124 views
22

設置S3的日誌我使用泊塢窗,撰寫建立一個可擴展的氣流集羣。我根據我的方法關閉這個Dockerfile https://hub.docker.com/r/puckel/docker-airflow/氣流

我的問題越來越設置了可寫/從S3讀取日誌。當DAG完成我得到這樣

*** Log file isn't local. 
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00 
*** Failed to fetch log file from worker. 

*** Reading remote logs... 
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06- 
26T11:00:00 

錯誤我設立在airflow.cfg文件中的新節這樣

[MyS3Conn] 
aws_access_key_id = xxxxxxx 
aws_secret_access_key = xxxxxxx 
aws_default_region = xxxxxxx 

然後在airflow.cfg在遠程日誌部分指定的S3路徑

remote_base_log_folder = s3://buckets/xxxx/airflow/logs 
remote_log_conn_id = MyS3Conn 

我是否正確設置了它,並且有錯誤?在這裏,我有失蹤的成功祕訣嗎?

- 更新

我試着在URI和JSON格式導出既不似乎工作。然後我輸出aws_access_key_id和aws_secret_access_key,然後氣流開始採集它。現在,我讓他在工人錯誤日誌

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3 
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00 
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00 
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00 

- 更新

我發現這個鏈接以及 https://www.mail-archive.com/[email protected]/msg00462.html

然後我炮轟了我的工人一臺機器(從網絡服務器分開和調度程序)並在python中運行這一位代碼

import airflow 
s3 = airflow.hooks.S3Hook('s3_conn') 
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder')) 

我收到此錯誤消息。

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden 

我嘗試出口幾種不同類型的AIRFLOW_CONN_ ENVS作爲連接部分https://airflow.incubator.apache.org/concepts.html和其他回答這個問題解釋在這裏。

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3 

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"} 

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"} 

我還遠銷AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY沒有成功。

這些憑據存儲在數據庫中,以便有一次我在他們應該由工人拿起UI添加他們,但他們不能寫/出於某種原因讀取日誌。

+0

在這一點上,我會採取任何戰略,以獲得登錄工作。我無法在本地獲取它們,在s3上,或使用rfs – JackStat

+0

文件夾'logs'是否存在於路徑中?如果文件夾存在,至少本地日誌應該沒有任何問題。如果他們甚至不能在本地工作,我唯一能想到的其他原因是氣流文件夾上的權限不正確。 – Him

+0

我們可能會在這裏有一些.https://github.com/puckel/docker-airflow/pull/100 – JackStat

回答

11

您需要設置通過氣流UI S3的連接。爲此,您需要轉到氣流UI上的Admin - > Connections選項卡,併爲您的S3連接創建一個新行。

一個例子配置是:

Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

+1

好的,我也會嘗試。與它的問題是,我們已經dockerized所有東西,所以每次我們升級圖像,我們將有一個手動步驟 – JackStat

+0

您可以通過dockerfile導出一個環境變量,通過氣流作爲連接參數 https://stackoverflow.com/a/44708691/2879084 – Him

+1

這些設置是否從UI工作?如果是,我可以添加更多關於自動配置的詳細信息。 – Him

1

這裏有一個解決方案,如果你不想使用管理UI。

我的部署過程Dockerized,我從來不碰管理界面。我還喜歡在bash腳本中設置Airflow專用環境變量,該腳本覆蓋.cfg文件。

氣流(S3)

首先,你需要安裝到您的氣流日誌寫入S3的s3分裝。 (boto3工作正常,爲您的DAG中Python的工作,但S3Hook依賴於S3子包)。

一個側面說明:暢達安裝doesn't handle this yet,所以我必須做的pip install airflow[s3]

環境變量

在bash腳本,我設置這些core變量。從these instructions開始,但使用的命名約定AIRFLOW__{SECTION}__{KEY}環境變量,我做的:

export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key 
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri 
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False 

S3連接ID

s3_uri的是,我做了一個連接ID。在Airflow中,它對應於另一個環境變量AIRFLOW_CONN_S3_URI。這個值是你的S3路徑,它必須是URI形式。那是

s3://access_key:[email protected]/key 

存儲這個,但是你處理其他敏感的環境變量。

使用此配置,Airflow會將您的日誌寫入S3。他們將遵循s3://bucket/key/dag/task_id的路線。

+0

這將仍然適用於氣流1.9.0? – Alex

10

注意:從Airflow 1.9.0開始,遠程登錄已被significantly altered

對於1.9+,按照步驟1-9 here獲取遠程日誌,用s3替換gcs。

編輯 - 完整的說明:

  1. 創建一個目錄來存放CONFIGS並將其放置,以便它可以在PYTHONPATH找到。一個例子是$ AIRFLOW_HOME /配置稱爲

  2. 創建空的文件$ AIRFLOW_HOME /配置/ log_config.py和 $ AIRFLOW_HOME /配置/ __ init__.py

  3. 複製的airflow/config_templates/airflow_local_settings.py內容到log_config.py文件這是剛剛在上面的步驟中創建的。

  4. 定製模板的以下部分:

    #Add this variable to the top of the file. Note the trailing slash. 
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/' 
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG 
    LOGGING_CONFIG = ... 
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 
    's3.task': { 
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 
        'formatter': 'airflow.task', 
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 
        's3_log_folder': S3_LOG_FOLDER, 
        'filename_template': FILENAME_TEMPLATE, 
    }, 
    
    Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 
    'loggers': { 
        'airflow.task': { 
         'handlers': ['s3.task'], 
         ... 
        }, 
        'airflow.task_runner': { 
         'handlers': ['s3.task'], 
         ... 
        }, 
        'airflow': { 
         'handlers': ['console'], 
         ... 
        }, 
    } 
    
  5. 確保一個S3連接鉤已在氣流被定義,作爲每the above answer。掛鉤應該具有對S3_LOG_FOLDER中定義的s3存儲區的讀取和寫入權限。

  6. 更新$ AIRFLOW_HOME/airflow。CFG包含:

    task_log_reader = s3.task 
    logging_config_class = log_config.LOGGING_CONFIG 
    remote_log_conn_id = <name of the s3 platform hook> 
    
  7. 重新啓動氣流網絡服務器和調度,並觸發(或等待)新的任務執行。

  8. 驗證日誌是否顯示您定義的存儲桶中新執行的任務。

  9. 驗證s3存儲查看器是否在UI中工作。拉起一個新執行的任務,並確認您看到:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. 
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] 
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor 
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py 
    
+1

不應該是'$ AIRFLOW_HOME/config/__ init __。py.'嗎? – andresp

+0

哎呀,降價格式讓我。我已經編輯了它,謝謝! –

+1

還有另一個錯字's3TaskHandler'應該是'S3TaskHandler' – pyCthon