我有一個用Django編寫的REST API,其中有一個端點,在發佈它時排隊一個芹菜任務。該響應包含我想用來測試任務創建並獲得結果的任務ID。所以,我想這樣做:用於芹菜單元測試的內存中間代理
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
我顯然不希望有運行芹菜工人運行單元測試,我希望以某種方式嘲笑它。我無法使用CELERY_ALWAYS_EAGER,因爲這似乎完全繞過了代理,從而阻止我使用AsyncResult通過其id來獲取任務(如所述的here)。
通過芹菜和kombu docs,我發現有一個單元測試的內存傳輸,這將做我在找什麼。我試圖重寫BROKER_URL
設置來使用它的測試:
@override_settings(BROKER_URL='memory://')
def test_async_job():
但行爲是一樣的與ampq經紀人:它會阻止測試中等待結果。任何想法我該如何配置這個經紀人來讓它在測試中工作?
即使使用內存代理,您仍然需要一名工作人員。不幸的是,我不認爲你想要做什麼是可能的。您可能需要啓動一個worker來使用您的測試,或使用CELERY_ALWAYS_EAGER同步運行任務(在這種情況下,如您發現的那樣,您沒有獲得AsyncResult)。 – jrothenbuhler
爲什麼你需要通過ID來訪問任務?這看起來很適合單元測試。爲什麼不直接測試直接產生任務的函數,而是通過HTTP進行測試?這樣你就可以得到與'AsyncResult'具有相同API的'EagerResult'。 – patrys
[單元測試與Django芹菜?](http://stackoverflow.com/questions/4055860/unit-testing-with-django-celery) –