我想學習和整合芹菜與燒瓶應用程序。我試圖在鏈接中提到的東西: Flask Celery不能從燒瓶應用程序導入芹菜對象
但我想要移動單獨模塊中的所有異步任務。因此,這裏是我的git倉庫,我想提出我的變化: Beacons Repo
所以我初始化我的芹菜對象beacons/__init__.py
:
from beacons.async import make_celery
mq_url = 'amqp://mq:[email protected]/localhost'
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL=mq_url,
CELERY_BACKEND=mq_url
)
log_handler = logging.StreamHandler()
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(Formatter(
'\n-------------------------------------\n'
'TIME: %(asctime)s \n'
'LEVEL: %(levelname)s \n'
'MESSAGE: %(message)s \n'
'FILE: %(pathname)s \n'
'LINE: %(lineno)d'
'\n-------------------------------------\n'))
app.logger.addHandler(log_handler)
app.register_blueprint(portal, url_prefix='/beacons')
app.register_blueprint(apis, url_prefix='/beacons/apis')
celery = make_celery(app)
__all__ = ['celery']
我make_celery
在beacons/async/__init__.py
如下:
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
我創建了另一個模塊beacons/tasks.py
即:
from beacons import celery
@celery.task()
def add(x, y):
return x + y
,我使用這個模塊中的我的意見beacons/portal/view/rest_apis.py
一個如下:
from beacons.tasks import add
@apis.route('/add', methods=['POST'])
def add_page():
x = request.form.get('x')
y = request.form.get('y')
task = add.delay(x, y)
return json_response(
{"id": task.id}, status_code=200)
當我嘗試運行芹菜工人和瓶應用程序,我得到這個錯誤:
from beacons import celery
ImportError: cannot import name celery
如何我可以將beacons
模塊中編寫的芹菜物體導入其子模塊或任務本身。完整的代碼請參考Git Repo。