芹菜文檔mentions testing Celery within Django但不解釋如何測試芹菜任務,如果你不使用Django。你怎麼做到這一點?你如何測試芹菜任務?
回答
可以使用任何unittest lib在那裏同步測試任務。在使用芹菜任務時,我通常會做2個不同的測試會話。第一個(正如我所建議的那樣)是完全同步的,應該是確保算法完成它應該做的工作。第二場會議使用整個系統(包括經紀人),並確保我沒有序列化問題或任何其他分佈,通信問題。
所以:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
而且測試:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
希望幫助!
除了使用HttpDispatchTask的任務外, http://docs.celeryproject.org/en/latest/userguide/remote-tasks.html我必須設置celery.conf.CELERY_ALWAYS_EAGER = True,但即使同時設置了celery.conf.CELERY_IMPORTS =('celery.task。 http')測試失敗NotRegistered:celery.task.http.HttpDispatchTask – DavidM
奇怪,你確定你沒有一些導入問題?這[測試](http://pastebin.com/TXkZxb6t)的作品(注意,我假裝的響應,所以它返回芹菜期望)。此外,CELERY_IMPORTS中定義的模塊將在[工作程序初始化]期間導入(https://github.com/celery/celery/blob/master/celery/loaders/base.py#L107),爲了避免這種情況,我建議你可以調用'celery.loader.import_default_modules()'。 – FlaPer87
我也建議你看看[這裏](https://github.com/celery/celery/blob/master/celery/tests/tasks/test_http.py)。它嘲笑http請求。不知道它是否有幫助,我想你想測試一個正在運行的服務,不是嗎? – FlaPer87
取決於你想要測試的是什麼。
- 直接測試任務代碼。不要調用「task.delay(...)」,只需在單元測試中調用「task(...)」即可。
- 使用CELERY_ALWAYS_EAGER。這會導致你的任務在你說「task.delay(...)」時被立即調用,因此你可以測試整個路徑(但不是任何異步行爲)。
CELERY_ALWAYS_EAGER鏈接在你的答案現在已經死了。 –
Celery 4.0+的文檔鏈接在這裏:http://docs.celeryproject.org/en/v4.1.0/userguide/configuration.html#task-always-eager – krassowski
我用這個:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...
文檔:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER讓你運行你的任務同步,你不需要芹菜服務器。
作爲芹菜3.0,設置在CELERY_ALWAYS_EAGER
Django的單程的是:
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
不要以這種方式修改django設置。它會對所有其他測試產生副作用。 CELERY_ALWAYS_EAGER在測試之後不會恢復,因此它將在所有下列測試中設置爲True。請參閱https://docs.djangoproject.com/en/1.9/topics/testing/tools/#overriding-settings 使用@override_settings(CELERY_ALWAYS_EAGER = True)而不是 – ornoone
@ornoone您是正確的。我更新了我的答案。感謝您的反饋意見。 –
要保存額外的谷歌搜索,你需要的導入是:'from django.test import override_settings' – Marshall
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
py.test夾具
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
附錄:使send_ta SK尊重渴望
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
對於那些對芹菜4是:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
由於設置名稱已更改,且需要的,如果你選擇升級更新,請參閱
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names
根據[官方文檔](http://docs.celeryproject.org/en/latest/userguide/testing.html),使用「task_always_eager」(早期的「CELERY_ALWAYS_EAGER」)不適合單元測試。相反,他們提出了其他一些很好的方法來測試你的Celery應用程序。 – krassowski
- 1. 單元測試芹菜任務直接
- 2. 如何鏈芹菜任務
- 3. 芹菜任務
- 4. 芹菜任務
- 5. 如何在django上測試時禁用芹菜任務
- 6. 芹菜鏈任務
- 7. 推芹菜任務
- 8. 芹菜任務重試(芹菜,Django和RabbitMQ)
- 9. 測量芹菜任務執行時間
- 10. 檢測芹菜任務和所有子任務何時完成
- 11. Python芹菜 - 如何在其他任務中調用芹菜任務
- 12. 如何從先前的芹菜任務中產生芹菜任務?
- 13. 芹菜工人不重試任務()
- 14. 重試芹菜'send_task'發送的任務
- 15. 重試與芹菜Http回調任務
- 16. 重試芹菜任務通過task_id
- 17. 芹菜任務不會重試
- 18. Django /芹菜 - 如何殺死芹菜任務?
- 19. 如何回覆沒有芹菜的芹菜任務?
- 20. 如何避免酸洗芹菜任務?
- 21. 如何刪除芹菜任務?
- 22. 我如何配置芹菜任務
- 23. 芹菜 - 如何記錄父任務ID?
- 24. 如何管理芹菜中的任務?
- 25. 芹菜如何實際執行任務?
- 26. 如何獲取芹菜任務狀態?
- 27. 如何構建芹菜任務
- 28. 芹菜:如何關閉任務確認?
- 29. 從django單元測試中獲取芹菜任務列表
- 30. 無法在Django測試中同步調用芹菜任務
的鏈接的文檔已經消失,似乎並沒有在當前的Celery 3.1文檔中取代。 – keturn
@打開[使用Celery 2.5進行單元測試]的文檔(http://docs.celeryproject.org/en/2.5/django/unit-testing.html)仍然可用 – Merwan
鏈接到Django文檔已損壞。 – sheldonkreger