2016-12-06 25 views
1

我試圖創建一些芹菜任務作爲類,但我有一些困難。這些類是:芹菜:自定義基類/子類基礎的任務不顯示在app.tasks

class BaseCeleryTask(app.Task): 

def is_complete(self): 
    """ default method for checking if celery task has completed. """ 
    # simply return result (since by default tasks return boolean indicating completion) 

    try: 
     return self.result 
    except AttributeError: 
     logger.error('Result not defined. Make sure task has run!') 
     return False 


class MacroReportTask(BaseCeleryTask): 

def run(self, params): 
    """ Override the default run method with signal factory run""" 
    # hold on to the factory 
    process = MacroCountryReport(params) 
    self.result = process.run() 
    return self.result 

但是當我初始化程序,並檢查app.tasks(或運行工),應用程序似乎並沒有在它的註冊表上述這些任務。其他基於功能的任務(using app.task() decorator)似乎已被記錄正常。

我運行上面的任務爲:

process = SignalFactoryTask() 
process.delay(params) 

芹菜工人錯誤以下消息: Received unregistered task of type None

我認爲我遇到的問題是:如何將自定義類添加到任務註冊表中,就像我使用常規基於函數的任務一樣?

回答

2

跑到完全相同的問題,花了幾個小時找到解決方案,因爲我90%肯定這是一個錯誤。在你的類的任務,請嘗試以下

class BaseCeleryTask(app.Task): 

    def __init__(self): 
     self.name = "[modulename].BaseCeleryTask" 

class MacroReportTask(app.Task): 

    def __init__(self): 
     self.name = "[modulename].MacroReportTask" 

它似乎與應用程序註冊它仍然具有這樣的名稱不會自動配置的錯誤。讓我知道這是否有效。

+0

感謝您的幫助!我必須測試,但會讓你知道它是否有效! – kri