0
我有一個python函數將數據從其他兩個表格加載到sql server表格中。使用luigi任務替換表格加載函數
def load_table(date1,date2):
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
cnx1= connection string
self.curs=cnx1.cursor()
self.curs.execute(stmt)
self.curs.commit()
我想這個功能轉換成路易吉任務
遵照docs嘗試以下方法:
class Datetask(luigi.Task):
def output(self):
return luigi.DateParameter()
class loading(luigi.Task):
def requires(self):
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
def run(self):
date1 = dict['date1']
date2 = dict['date2']
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
當我嘗試和運行它,我得到的錯誤:
python -m luigi --module load_task loading --local-scheduler
DEBUG: Checking if loading() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method
is_complete = task.complete()
WARNING: Will not run loading() or any dependencies due to error in deps() method:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add
deps = task.deps()
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps
return flatten(self._requires())
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires
return flatten(self.requires()) # base impl
File "load_task.py", line 19, in requires
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
NameError: global name 'DateTask' is not defined
我在定義DateTask so erro r令我困惑。
而且,做所有的任務,需要有所有3 需要(),運行,輸出?
此外,是否有必要始終將輸出寫入文件?當涉及到使用路易吉所以希望任何輸入 全新
你有一個錯字在你的任務類定義:它應該是'DateTask',而不是'Datetask'(T必須大寫!) – matagus
除此之外,從DateTask輸出的方法必須返回路易吉'子類。目標'而不是參數。 – matagus
ohh..typo是愚蠢的..我讀了關於使用luigi.Target將其寫入本地文本文件。有沒有其他方式來訪問date1和date2? –