2017-05-04 32 views
1

今天,我所寫的機器學習工作正在手工完成。我下載需要的輸入文件,學習和預測事情,輸出一個.csv文件,然後將其複製到數據庫中。經常性機器學習使用Luigi的ETL

但是,由於這是投入生產,我需要自動化所有這些過程。所需的輸入文件將每個月(並最終更頻繁)到達提供商的S3存儲桶。

現在我打算使用Luigi來解決這個問題。這裏是理想的過程:

  • 每週(或天或小時,無論我感覺比較好),我需要我的程序看S3存儲新的文件
  • 當文件到達後,我的機器學習管道被激發,並吐出一些熊貓數據幀。
  • 在那之後,我需要我的程序寫這些成果轉化爲不同的數據塊

的問題是,我不知道怎麼用路易吉自動化:

  1. 文件看
  2. 計劃任務(例如每個月)
  3. 部署它(可重複的方式)

今天,這裏是管道骨架,我有幾點:

import luigi 

from mylib import ml_algorithm 
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done 

class Extract(luigi.Task): 
    date = luigi.DateParameter() 
    s3_path = luigi.Parameter() 
    filename = luigi.Parameter() 
    def requires(self): 
     pass 
    def output(self, filename): 
     luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename) 
    def run(self): 
     data = read_s3(s3_path + '/' + file) 
     with self.output.open('w') as hdfs_file: 
      write_hdfs(hdfs_file, data) 


class Transform(luigi.Task): 
    date = luigi.DateParameter() 
    s3_path = luigi.Parameter() 
    filename = luigi.Parameter() 
    def requires(self): 
     return Extract(self.date, self.s3_path, self.filename) 
    def output(self, filename): 
     luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename) 
    def run(self): 
     with self.input().open('r') as inputfile: 
      data = read_hdfs(inputfile) 
     result = ml_algorithm(data) 
     with self.output().open('w') as outputfile: 
      write_hdfs(outputfile, result) 
     mark_as_done(filename) 



class Load(luigi.Task): 
    date = luigi.DateParameter() 
    s3_path = luigi.Parameter() 
    def requires(self): 
     return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)] 
    def output(self): 
     # Fake DB target, just for illustrative purpose 
     luigi.hdfs.DBTarget('...') 
    def run(self): 
     for input in self.input(): 
      with input.open('r') as inputfile: 
       result = read_hdfs(inputfile) 
      # again, just for didatic purposes 
      db = self.output().connection 
      write_db(db, result) 

然後我想補充一點,以crontab然後簡單地包裝成一個泊塢窗容器。

問題:

  • 這是人們用來做正確的模式?有沒有更好的方法來做到這一點?
  • 如果我有Transform1(取決於輸入數據)和Transform2(取決於Transform1結果),並想救兩個結果爲不同的數據塊,這怎麼可能用路易吉管道(也看的這種情況下一個實施文件)?
  • 人們會使用與cron不同的東西嗎?
  • 如何正確裝箱?
+0

只是爲了澄清,多步ETL的一個例子是:輸入數據是用戶表,其中一些特徵的每個用戶,在第一個轉換可以填充包含缺失值的列,和第二變換可以是聚類的用戶。我想保存已填充的表格和集羣。 – prcastro

回答

2

您的模式看起來基本正確。我將首先使用cron作業來調用觸發Load任務管道的腳本。看起來這個Load任務已經驗證了S3存儲桶中新文件的存在,但是您必須將輸出更改爲有條件的,這可能是一個狀態文件或其他無關的東西。您也可以在更高級別WrapperTask(無輸出)中執行此操作,僅在需要新文件時才需要Load任務。然後,您可以使用WrapperTask來要求兩個不同的加載任務,並分別要求您的Transform1Transform2

在容器中添加......我的cron真正調用的是一個腳本,它從git中提取我的最新代碼,在必要時構建一個新的容器,然後調用docker run。我有另一個始終運行的容器luigid。每日搬運工運行執行使用CMD調用所需要的那一天的參數路易吉任務容器中的shell腳本。

+0

但是如果Transform1取決於Transform2?這樣我就不能使用一個包裝任務來調用它們,因爲它們是依賴的。我也不明白如何有條件地改變輸出。 – prcastro

+0

您可以添加Transform2到'需要()'Transform1的。即使WrapperTask同時需要,Luigi也會找出正確的圖表。使輸出有條件是有點冒險,可能只是有WrapperTask。 – MattMcKnight