這是我的previous question的後續操作,關於ETL過程中經常使用的模式。Luigi任務不會觸發ETL過程的要求
今天,我寫的機器學習工作正在運行。我下載需要的輸入文件,學習和預測事情,輸出一個.csv文件,然後將其複製到數據庫中。
但是,由於這是投入生產,我需要自動化所有這些過程。所需的輸入文件將每個月(並最終更頻繁)到達提供商的S3存儲桶。作爲一個例子,我試圖在Luigi中實現這一點,但將S3更改爲本地目錄,所以事情會更簡單。這個程序應該
- 關注新文件
- 當一些新的文件中找到輸入目錄,由
Transform
任務(使用algorithm
功能),它由Extract
任務提取數據目錄 - 過程
- 它加載到PostgreSQL數據庫的
Load
任務
個
import glob
import luigi
from luigi.contrib import postgres
import pandas as pd
class ReadFile(luigi.ExternalTask):
# Simply load the new file from input directory
filename = luigi.Parameter()
def output(self):
return luigi.hdfs.LocalTarget('input/' + self.filename)
class Extract(luigi.Task):
# Extract from input directory and put in the data directory
filename = luigi.Parameter()
def requires(self):
return ReadFile(self.filename)
def output(self):
return luigi.hdfs.LocalTarget('data/' + self.filename)
def run(self):
with self.input().open('r') as input_file:
data = input_file.read()
with self.output().open('w') as output_file:
write(output_file, data)
class Transform(luigi.Task):
# Transform the file from data directory using the transform function
filename = luigi.Parameter()
def requires(self):
return Extract(self.filename)
def output(self, filename):
return luigi.hdfs.LocalTarget('results/' + self.filename)
def run(self):
with self.input().open('r') as input_file:
data = input_file.read()
result = trasnform(data)
with self.output().open('w') as output_file:
result.to_csv(output_file)
mark_as_done(self.filename)
class Load(luigi.Task):
# Find new files, run the Transform function and load into the PostgreSQL DB
date = luigi.DateParameter()
def requires(self):
return [Transform(filename) for filename in new_files('input/')]
def output(self):
return postgres.PostgresTarget(host='db', database='luigi', user='luigi', password='luigi', table='test', update_id=self.date)
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = pd.read_csv(inputfile)
connection = self.output().connect()
for row in result.itertuples():
cursor = connection.cursor()
cursor.execute('INSERT INTO test VALUES (?,?)', row)
# Get connection to the SQLite DB, which will store the files that were already processed
SQLITE_CONNECTION = None
def get_connection():
if SQLITE_CONNECTION is None:
SQLITE_CONNECTION = sqlite3.connect('processed.db')
return SQLITE_CONNECTION
# Mark filename as done in the SQLite DB
def mark_as_done(filename):
connection = get_connection()
cursor = connection.cursor()
cursor.execute('INSERT INTO processed_files VALUES (?)', (filename,))
# Check of the file were already processed
def new_file(filename):
connection = get_connection()
cursor = connection.cursor()
cursor.execute('SELECT * FROM processed_files WHERE file=%s', (filename,))
return cursor.rowcount == 0
# Yields filenames of files that were not processed yet
def new_files(path):
for filename in glob.glob(path + '*.csv'):
if new_file(filename):
yield filename
# Mock of the transform process
def trasnform(data):
return pd.DataFrame({'a': [1,2,3], 'b': [1,2,3]})
問題:
- 當我把一個文件輸入目錄,並觸發負載任務,它不火的提取任務。我究竟做錯了什麼?
- 負載是否每次都會使用這個
update_id
參數激發?
爲什麼代碼使用4個空格縮進並且沒有顯示爲格式化代碼? – prcastro
因爲它被假定爲第4項中的塊的一部分。 – wildplasser
謝謝,那就解釋一下吧 – prcastro