我的問題是,當我嘗試調用test()時,它工作得很好,但是當我調用test.delay()時,它返回「received unregistered task」。芹菜Redis返回「接收未註冊的任務類型」
運行試驗(+)和test.delay()(你可以看到,第一個作品。)從test.delay
結果()
Settings.py
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS=("tasks")
INSTALLED_APPS = (
"systech_account",
#...
)
tasks.py
from __future__ import absolute_import
from celery import shared_task
@shared_task
def test():
return "Just a Test"
celeryconfig.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'root.settings')
app = Celery()
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
。
我使用
- 的Django 1.9
- 芹菜4.0
- Redis的服務器2.8.4
- 的Python 2.7
- 的Ubuntu 14.04
感謝您的回覆。當你說「重啓Celery」時,你的意思是重新運行* celery --app = root.celery:app worker --loglevel = INFO *?如果是這樣,我已經做到了,但沒有運氣。至於Tasks.py,它只是一種類型。感謝您指出了這一點。 – aldesabido