2014-03-06 56 views
20

我有一個用Django編寫的REST API,其中有一個端點,在發佈它時排隊一個芹菜任務。該響應包含我想用來測試任務創建並獲得結果的任務ID。所以,我想這樣做:用於芹菜單元測試的內存中間代理

def test_async_job(): 
    response = self.client.post("/api/jobs/", some_test_data, format="json") 
    task_id = response.data['task_id'] 
    result = my_task.AsyncResult(task_id).get() 
    self.assertEquals(result, ...) 

我顯然不希望有運行芹菜工人運行單元測試,我希望以某種方式嘲笑它。我無法使用CELERY_ALWAYS_EAGER,因爲這似乎完全繞過了代理,從而阻止我使用AsyncResult通過其id來獲取任務(如所述的here)。

通過芹菜和kombu docs,我發現有一個單元測試的內存傳輸,這將做我在找什麼。我試圖重寫BROKER_URL設置來使用它的測試:

@override_settings(BROKER_URL='memory://') 
def test_async_job(): 

但行爲是一樣的與ampq經紀人:它會阻止測試中等待結果。任何想法我該如何配置這個經紀人來讓它在測試中工作?

+3

即使使用內存代理,您仍然需要一名工作人員。不幸的是,我不認爲你想要做什麼是可能的。您可能需要啓動一個worker來使用您的測試,或使用CELERY_ALWAYS_EAGER同步運行任務(在這種情況下,如您發現的那樣,您沒有獲得AsyncResult)。 – jrothenbuhler

+0

爲什麼你需要通過ID來訪問任務?這看起來很適合單元測試。爲什麼不直接測試直接產生任務的函數,而是通過HTTP進行測試?這樣你就可以得到與'AsyncResult'具有相同API的'EagerResult'。 – patrys

+0

[單元測試與Django芹菜?](http://stackoverflow.com/questions/4055860/unit-testing-with-django-celery) –

回答

11

您可以指定你的設置broker_backend:

if 'test' in sys.argv[1:]: 
    BROKER_BACKEND = 'memory' 
    CELERY_ALWAYS_EAGER = True 
    CELERY_EAGER_PROPAGATES_EXCEPTIONS = True 

,或者您可以直接在您的測試

import unittest 
from django.test.utils import override_settings 


class MyTestCase(unittest.TestCase): 

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, 
         CELERY_ALWAYS_EAGER=True, 
         BROKER_BACKEND='memory') 
    def test_mytask(self): 
     ... 
4

可使用海帶內存經紀人覆蓋有裝飾的設置運行單元測試,但是要做到這一點,您需要使用與Django服務器相同的Celery應用程序對象來啓動Celery工作器。

要使用內存經紀人,設置BROKER_URL到memory://localhost/

然後,旋轉了一個小芹菜工人,你可以做到以下幾點:

app = <Django Celery App> 

# Set the worker up to run in-place instead of using a pool 
app.conf.CELERYD_CONCURRENCY = 1 
app.conf.CELERYD_POOL = 'solo' 

# Code to start the worker 
def run_worker(): 
    app.worker_main() 

# Create a thread and run the worker in it 
import threading 
t = threading.Thread(target=run_worker) 
t.setDaemon(True) 
t.start() 

你需要確保你使用與Django芹菜應用程序實例一樣的應用程序。

請注意,啓動worker會打印很多東西並修改日誌記錄設置。

4

下面是一個更全面的Django TransactionTestCase示例與Celery 4.x一起使用。

import threading 

from django.test import TransactionTestCase 
from django.db import connections 

from myproj.celery import app # your Celery app 


class CeleryTestCase(TransactionTestCase): 
    """Test case with Celery support.""" 

    @classmethod 
    def setUpClass(cls): 
     super().setUpClass() 
     app.control.purge() 
     cls._worker = app.Worker(app=app, pool='solo', concurrency=1) 
     connections.close_all() 
     cls._thread = threading.Thread(target=cls._worker.start) 
     cls._thread.daemon = True 
     cls._thread.start() 

    @classmethod 
    def tearDownClass(cls): 
     cls._worker.stop() 
     super().tearDownClass() 

請注意,這並不會將您的隊列名稱更改爲測試隊列,因此如果您也在運行應用程序,那麼您也需要這樣做。

+0

非常好的答案!它阻止我們使用欺騙工人的'task_always_eager'選項。爲了訪問當前的芹菜應用程序,只需從'celery import current_app'進行操作就簡單多了。 – Raffi

+0

是否有任何理由選擇了「TransactionTestCase」而不是普通的測試用例? – Jonathan

+0

@danielle這可悲的不適合我:我得到conn_errors = self.channel.connection.client.connection_errors AttributeError:'NoneType'對象沒有屬性'客戶端' – Jonathan