2016-02-29 40 views
0

我想在Celery框架之上構建一個應用程序。芹菜任務馬上自動發現

我有一個模塊settings/celery_settings.py與初始化像這樣的芹菜應用程序的代碼(我展開了一些變量):

from __future__ import absolute_import 
from celery import Celery 

pfiles = ['other_tasks.test123', 'balance_log.balance_log'] 
app = Celery('myapp') 
# here I just have some parameters defined like broker, result backend, etc 
# app.config_from_object(settings) 

# TRYING to discover tasks 
app.autodiscover_tasks(pfiles) 

文件other_tasks/test123.pybalance_log/balance_log.py包含任務定義這樣的:

# file other_tasks/test123.py 
from celery import shared_task, Task 

@shared_task() 
def mytask(): 
    print("Test 1234!") 

class TestTask01(Task): 

    def run(self, client_id=None): 
     logger.debug("TestTask01: run") 
     return client_id 

我運行芹菜工:

python3 /usr/local/bin/celery -A settings.celery_settings worker 

而這種方式可以發現任務。我可以稱這些任務。

但後來我嘗試使用IPython的:

In [1]: from settings.celery_settings import app 

In [2]: app.tasks 
Out[2]: 
{'celery.backend_cleanup': <@task: celery.backend_cleanup of XExchange:0x7f9f50267ac8>, 
'celery.chain': <@task: celery.chain of XExchange:0x7f9f50267ac8>, 
'celery.chord': <@task: celery.chord of XExchange:0x7f9f50267ac8>, 
'celery.chord_unlock': <@task: celery.chord_unlock of XExchange:0x7f9f50267ac8>, 
'celery.chunks': <@task: celery.chunks of XExchange:0x7f9f50267ac8>, 
'celery.group': <@task: celery.group of XExchange:0x7f9f50267ac8>, 
'celery.map': <@task: celery.map of XExchange:0x7f9f50267ac8>, 
'celery.starmap': <@task: celery.starmap of XExchange:0x7f9f50267ac8>} 

而且很顯然它發現任務。

看來,當我明確地調用任務時,我首先導入它們並指定芹菜的確切路徑。這就是它工作的原因。

問:我如何讓他們發現有已知任務的列表?

回答

4

最後我想通了,那裏是autodiscover_tasks功能的附加參數:

def autodiscover_tasks(self, packages, related_name='tasks', force=False): 
    ... 

因此,設置force=True後它變成工作!

app.autodiscover_tasks(pfiles, force=True) 
1

這是我的示例配置:

的conf/celeryconfig

from conf import settings 

CELERYD_CHDIR='/usr/local/src/imbue/application/imbue' 
CELERY_IGNORE_RESULT = False 
CELERY_RESULT_BACKEND = "amqp" 
CELERY_TASK_RESULT_EXPIRES = 360000 
CELERY_RESULT_PERSISTENT = True 
BROKER_URL='amqp://<USERNAME>:<PASSWORD>@rabbitmq:5672' 
CELERY_ENABLE_UTC=True 
CELERY_TIMEZONE= "US/Eastern" 
CELERY_IMPORTS=("hypervisor.esxi.vm_operations", 
       "tools.deploy_tools",) 

管理程序/ ESXi的/ vm_operations.py

@task(bind=True, default_retry_delay=300, max_retries=5) 
def cancel_job(self, host_id=None, vm_id=None, job=None, get_job=False, **kwargs): 
    pass 

call_task.py

def call_task(): 
    log.info('api() | Sending task: ' + job_instance.reference)  
    celery = Celery() 
    celery.config_from_object('conf.celeryconfig') 
    celery.send_task("hypervisor.esxi.vm_operations.cancel_job", 
        kwargs={'job': job_instance, 
          'get_job': True}, 
        task_id=job_instance.reference) 

我用芹菜與導師,我從conf目錄下啓動:

source ~/.profile 
CELERY_LOGFILE=/usr/local/src/imbue/application/imbue/log/celeryd.log 
CELERYD_OPTS=" --loglevel=INFO --autoscale=10,5" 
cd /usr/local/src/imbue/application/imbue/conf 
exec celery worker -n [email protected]%h -f $CELERY_LOGFILE $CELERYD_OPTS 
+0

那麼如何獲得已知發現任務的列表? – baldr

+0

'from celery.task.control import inspect' 'i = inspect()。registered_tasks()' – felix