2017-05-05 25 views
0

這是我的previous question的後續操作,關於ETL過程中經常使用的模式。Luigi任務不會觸發ETL過程的要求

今天,我寫的機器學習工作正在運行。我下載需要的輸入文件,學習和預測事情,輸出一個.csv文件,然後將其複製到數據庫中。

但是,由於這是投入生產,我需要自動化所有這些過程。所需的輸入文件將每個月(並最終更頻繁)到達提供商的S3存儲桶。作爲一個例子,我試圖在Luigi中實現這一點,但將S3更改爲本地目錄,所以事情會更簡單。這個程序應該

  • 關注新文件
  • 當一些新的文件中找到輸入目錄,由Transform任務(使用algorithm功能),它由Extract任務提取數據目錄
  • 過程
  • 它加載到PostgreSQL數據庫的Load任務

import glob 

import luigi 
from luigi.contrib import postgres 
import pandas as pd 


class ReadFile(luigi.ExternalTask): 
    # Simply load the new file from input directory 
    filename = luigi.Parameter() 
    def output(self): 
     return luigi.hdfs.LocalTarget('input/' + self.filename) 


class Extract(luigi.Task): 
    # Extract from input directory and put in the data directory 
    filename = luigi.Parameter() 
    def requires(self): 
     return ReadFile(self.filename) 
    def output(self): 
     return luigi.hdfs.LocalTarget('data/' + self.filename) 
    def run(self): 
     with self.input().open('r') as input_file: 
      data = input_file.read() 
     with self.output().open('w') as output_file: 
      write(output_file, data) 


class Transform(luigi.Task): 
    # Transform the file from data directory using the transform function 
    filename = luigi.Parameter() 
    def requires(self): 
     return Extract(self.filename) 
    def output(self, filename): 
     return luigi.hdfs.LocalTarget('results/' + self.filename) 
    def run(self): 
     with self.input().open('r') as input_file: 
      data = input_file.read() 
     result = trasnform(data) 
     with self.output().open('w') as output_file: 
      result.to_csv(output_file) 
     mark_as_done(self.filename) 


class Load(luigi.Task): 
    # Find new files, run the Transform function and load into the PostgreSQL DB 
    date = luigi.DateParameter() 
    def requires(self): 
     return [Transform(filename) for filename in new_files('input/')] 
    def output(self): 
     return postgres.PostgresTarget(host='db', database='luigi', user='luigi', password='luigi', table='test', update_id=self.date) 
    def run(self): 
     for input in self.input(): 
      with input.open('r') as inputfile: 
       result = pd.read_csv(inputfile) 
      connection = self.output().connect() 
      for row in result.itertuples(): 
       cursor = connection.cursor() 
       cursor.execute('INSERT INTO test VALUES (?,?)', row) 

# Get connection to the SQLite DB, which will store the files that were already processed 
SQLITE_CONNECTION = None 
def get_connection(): 
    if SQLITE_CONNECTION is None: 
     SQLITE_CONNECTION = sqlite3.connect('processed.db') 
    return SQLITE_CONNECTION 


# Mark filename as done in the SQLite DB 
def mark_as_done(filename): 
    connection = get_connection() 
    cursor = connection.cursor() 
    cursor.execute('INSERT INTO processed_files VALUES (?)', (filename,)) 


# Check of the file were already processed 
def new_file(filename): 
    connection = get_connection() 
    cursor = connection.cursor() 
    cursor.execute('SELECT * FROM processed_files WHERE file=%s', (filename,)) 
    return cursor.rowcount == 0 


# Yields filenames of files that were not processed yet 
def new_files(path): 
    for filename in glob.glob(path + '*.csv'): 
     if new_file(filename): 
      yield filename 


# Mock of the transform process 
def trasnform(data): 
    return pd.DataFrame({'a': [1,2,3], 'b': [1,2,3]}) 

問題:

  • 當我把一個文件輸入目錄,並觸發負載任務,它不火的提取任務。我究竟做錯了什麼?
  • 負載是否每次都會使用這個update_id參數激發?
+0

爲什麼代碼使用4個空格縮進並且沒有顯示爲格式化代碼? – prcastro

+0

因爲它被假定爲第4項中的塊的一部分。 – wildplasser

+0

謝謝,那就解釋一下吧 – prcastro

回答

0

Load任務只創建Transform任務時在result/目錄中的文件。它不應該在input目錄中尋找新文件嗎?

+0

不是問題,還是沒有解決起來的要求。在更改我使用的真實文件夾的名稱時,這是一個錯字 – prcastro