2015-10-25 75 views
9

我的初始文件位於AWS S3。有人能指出我需要如何在Luigi Task中設置這個嗎?Luigi Pipeline in S3

我回顧了文檔,發現luigi.S3但我不清楚該怎麼做,然後我在網上搜索,只得到從mortar-luigi鏈接和luigi頂部的實現。

UPDATE

遵循爲@matagus的例子後(我創建的~/.boto文件的建議太):

# coding: utf-8 

import luigi 

from luigi.s3 import S3Target, S3Client 

class MyS3File(luigi.ExternalTask): 
    def output(self): 
     return S3Target('s3://my-bucket/19170205.txt') 

class ProcessS3File(luigi.Task): 

    def requieres(self): 
     return MyS3File() 

    def output(self): 
     return luigi.LocalTarget('/tmp/resultado.txt') 

    def run(self): 
     result = None 

     for input in self.input(): 
      print("Doing something ...") 
      with input.open('r') as f: 
       for line in f: 
        result = 'This is a line' 

     if result: 
      out_file = self.output().open('w') 
      out_file.write(result) 

當我執行它什麼也不會發生

DEBUG: Checking if ProcessS3File() is complete 
INFO: Informed scheduler that task ProcessS3File() has status PENDING 
INFO: Done scheduling tasks 
INFO: Running Worker with 1 processes 
DEBUG: Asking scheduler for work... 
DEBUG: Pending tasks: 1 
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File() 
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done  ProcessS3File() 
DEBUG: 1 running tasks, waiting for next task to finish 
INFO: Informed scheduler that task ProcessS3File() has status DONE 
DEBUG: Asking scheduler for work... 
INFO: Done 
INFO: There are no more tasks to run at this time 
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread 

正如你所看到的,消息Doing something...從不打印。哪裏不對?

+2

錯誤在'def requieres(self):'。它必須是'需要'。 – matagus

+0

Luigi檢查該方法以獲取輸入文件,由於'require'方法不存在,它將返回一個空列表。 – matagus

+0

你是絕對正確的!我是這樣的一個潛水員!謝謝! – nanounanue

回答

12

這裏的關鍵是定義一個外部任務沒有輸入和哪些輸出是你已經在生活在S3中的那些文件。路易吉文檔中Requiring another Task提到這一點:

Note that requires() can not return a Target object. If you have a simple Target object that is created externally you can wrap it in a Task class

所以,基本上你最終是這樣的:

import luigi 

from luigi.s3 import S3Target 

from somewhere import do_something_with 


class MyS3File(luigi.ExternalTask): 

    def output(self): 
     return luigi.S3Target('s3://my-bucket/path/to/file') 

class ProcessS3File(luigi.Task): 

    def requires(self): 
     return MyS3File() 

    def output(self): 
     return luigi.S3Target('s3://my-bucket/path/to/output-file') 

    def run(self): 
     result = None 
     # this will return a file stream that reads the file from your aws s3 bucket 
     with self.input().open('r') as f: 
      result = do_something_with(f) 

     # and the you 
     out_file = self.output().open('w') 
     # it'd better to serialize this result before writing it to a file, but this is a pretty simple example 
     out_file.write(result) 

UPDATE:

路易吉使用boto可以從中讀取文件和/或將它們寫入AWS S3,所以爲了使此代碼正常工作,您需要在boto配置文件~/boto中提供憑據(查找其他possible config file locations here):

[Credentials] 
aws_access_key_id = <your_access_key_here> 
aws_secret_access_key = <your_secret_key_here> 
+0

您的代碼存在一些問題,請問您能修復它們嗎? (例如,第一個'output'方法中的'return'應該是'return S3Target(...' – nanounanue

+0

另一個問題,我應該在哪個部分提供'aws credentials'? – nanounanue

+0

完成更新我的答案。 – matagus