我的初始文件位於AWS S3
。有人能指出我需要如何在Luigi Task
中設置這個嗎?Luigi Pipeline in S3
我回顧了文檔,發現luigi.S3
但我不清楚該怎麼做,然後我在網上搜索,只得到從mortar-luigi
鏈接和luigi頂部的實現。
UPDATE
遵循爲@matagus的例子後(我創建的~/.boto
文件的建議太):
# coding: utf-8
import luigi
from luigi.s3 import S3Target, S3Client
class MyS3File(luigi.ExternalTask):
def output(self):
return S3Target('s3://my-bucket/19170205.txt')
class ProcessS3File(luigi.Task):
def requieres(self):
return MyS3File()
def output(self):
return luigi.LocalTarget('/tmp/resultado.txt')
def run(self):
result = None
for input in self.input():
print("Doing something ...")
with input.open('r') as f:
for line in f:
result = 'This is a line'
if result:
out_file = self.output().open('w')
out_file.write(result)
當我執行它什麼也不會發生
DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task ProcessS3File() has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ProcessS3File() has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread
正如你所看到的,消息Doing something...
從不打印。哪裏不對?
錯誤在'def requieres(self):'。它必須是'需要'。 – matagus
Luigi檢查該方法以獲取輸入文件,由於'require'方法不存在,它將返回一個空列表。 – matagus
你是絕對正確的!我是這樣的一個潛水員!謝謝! – nanounanue