2016-06-09 87 views
2

我想這是在運行時註冊任務的最佳方式。我workfflow如下:芹菜任務的動態註冊

  • 開始芹菜應用
  • 開始Python應用程序
  • 的Python應用程序創建一個新的任務,我想在芹菜

回答

2

我已經完成的方式來安排是基於與「click package has with custom subcommands」具有相同想法的「插件」概念。

該應用結構(基於蟒3):

. 
├── dynamic_tasks.py 
├── run.py 
└── tasks 
    └── get_rate.py 

芹菜任務dynamic_tasks.py被定義爲以下:

import os 
import celery 

app = celery.Celery('dynamic_tasks', broker='amqp://[email protected]/', backend='rpc://') 

PLUGIN_FOLDER = os.path.join(os.path.dirname(__file__), 'tasks') 
def _absolutepath(filename): 
    """ Return the absolute path to the filename""" 
    return os.path.join(PLUGIN_FOLDER, filename) 

@app.task 
def tasks(funcname, *args, **kwargs): 
    try: 
     funcname = funcname.replace('-', '_') 
     funcname += '.py' 
     func = _absolutepath(funcname) 
     ns = {} 
     with open(func) as f: 
      code = compile(f.read(), func, 'exec') 
      eval(code, ns, ns) 
     return ns['task'](*args, **kwargs) 
    except IOError as e: 
     # Manage IOError 
     raise e 

的可插拔任務示例任務/get_rate.py

""" This task get the currency rate between a pair of currencies """  
import urllib.request 

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p' 

def task(pair='EURSEK', url_tmplt=URL): 
    with urllib.request.urlopen(url_tmplt.format(pair)) as res: 
     body = res.read() 
    return (pair, float(body.strip())) 

而且,簡單地說,從run.py運行示例:

from dynamic_tasks import tasks 

print(tasks.delay('get_rate', 'EURSEK').get()) 

EDITED 既然型動物的機器上運行芹菜,不可能依靠本地文件系統。我的新方法是發送函數作爲字符串執行:

@app.task 
def dynamic_tasks(funcname, funccode, *args, **kwargs): 
    try: 
     ns = {} 
     code = compile(funccode, funcname, 'exec') 
     eval(code, ns, ns) 
     logger.info('execute %r with args %r, %r', funcname, args, kwargs) 
     return ns['task'](*args, **kwargs) 
    except IOError: 
     logger.error("Error loading the dynamic function from text %s", funcname)