1
我試圖創建一些芹菜任務作爲類,但我有一些困難。這些類是:芹菜:自定義基類/子類基礎的任務不顯示在app.tasks
class BaseCeleryTask(app.Task):
def is_complete(self):
""" default method for checking if celery task has completed. """
# simply return result (since by default tasks return boolean indicating completion)
try:
return self.result
except AttributeError:
logger.error('Result not defined. Make sure task has run!')
return False
class MacroReportTask(BaseCeleryTask):
def run(self, params):
""" Override the default run method with signal factory run"""
# hold on to the factory
process = MacroCountryReport(params)
self.result = process.run()
return self.result
但是當我初始化程序,並檢查app.tasks(或運行工),應用程序似乎並沒有在它的註冊表上述這些任務。其他基於功能的任務(using app.task() decorator
)似乎已被記錄正常。
我運行上面的任務爲:
process = SignalFactoryTask()
process.delay(params)
芹菜工人錯誤以下消息: Received unregistered task of type None
。
我認爲我遇到的問題是:如何將自定義類添加到任務註冊表中,就像我使用常規基於函數的任務一樣?
感謝您的幫助!我必須測試,但會讓你知道它是否有效! – kri