今天,我所寫的機器學習工作正在手工完成。我下載需要的輸入文件,學習和預測事情,輸出一個.csv文件,然後將其複製到數據庫中。經常性機器學習使用Luigi的ETL
但是,由於這是投入生產,我需要自動化所有這些過程。所需的輸入文件將每個月(並最終更頻繁)到達提供商的S3存儲桶。
現在我打算使用Luigi來解決這個問題。這裏是理想的過程:
- 每週(或天或小時,無論我感覺比較好),我需要我的程序看S3存儲新的文件
- 當文件到達後,我的機器學習管道被激發,並吐出一些熊貓數據幀。
- 在那之後,我需要我的程序寫這些成果轉化爲不同的數據塊
的問題是,我不知道怎麼用路易吉自動化:
- 文件看
- 計劃任務(例如每個月)
- 部署它(可重複的方式)
今天,這裏是管道骨架,我有幾點:
import luigi
from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done
class Extract(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
pass
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
def run(self):
data = read_s3(s3_path + '/' + file)
with self.output.open('w') as hdfs_file:
write_hdfs(hdfs_file, data)
class Transform(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return Extract(self.date, self.s3_path, self.filename)
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
def run(self):
with self.input().open('r') as inputfile:
data = read_hdfs(inputfile)
result = ml_algorithm(data)
with self.output().open('w') as outputfile:
write_hdfs(outputfile, result)
mark_as_done(filename)
class Load(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
def requires(self):
return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
def output(self):
# Fake DB target, just for illustrative purpose
luigi.hdfs.DBTarget('...')
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = read_hdfs(inputfile)
# again, just for didatic purposes
db = self.output().connection
write_db(db, result)
然後我想補充一點,以crontab然後簡單地包裝成一個泊塢窗容器。
問題:
- 這是人們用來做正確的模式?有沒有更好的方法來做到這一點?
- 如果我有
Transform1
(取決於輸入數據)和Transform2
(取決於Transform1
結果),並想救兩個結果爲不同的數據塊,這怎麼可能用路易吉管道(也看的這種情況下一個實施文件)? - 人們會使用與cron不同的東西嗎?
- 如何正確裝箱?
只是爲了澄清,多步ETL的一個例子是:輸入數據是用戶表,其中一些特徵的每個用戶,在第一個轉換可以填充包含缺失值的列,和第二變換可以是聚類的用戶。我想保存已填充的表格和集羣。 – prcastro