2015-06-10 151 views
6

我想測試應用程序是否正在重試。如何使用Python測試Celery應用程序中的重試?

​​

而且我嘲笑測試

@patch('src.video_manager.VideoManager.convert') 
@patch('requests.post') 
def test_retry_failed_task(self, mock_video_manager, mock_requests): 
    mock_video_manager.return_value= {'webm':'file.webm', 'mp4':'file.mp4', 'ogv' : 'file.ogv', 'snapshot':'snapshot.png'} 
    mock_video_manager.side_effect = Exception('some error') 

    server.convert_video.retry = MagicMock() 

    server.convert_video('gif_url', 'http://www.company.com/webhook?attachment_id=1234') 
    server.convert_video.retry.assert_called_with(ANY) 

而且我得到這個錯誤

TypeError: exceptions must be old-style classes or derived from BaseException, not MagicMock

這是明顯的,但我不知道該怎麼做,否則它測試該方法是否被調用。

回答

3

我沒有得到它只使用內置重試的工作,所以我必須使用真正的重試的副作用模擬,這使得有可能在測試中捕捉它。 我已經做了這樣的:

from celery.exceptions import Retry 
from mock import MagicMock 
from nose.plugins.attrib import attr 

# Set it for for every task-call (or per task below with @patch) 
task.retry = MagicMock(side_effect=Retry) 

#@patch('task.retry', MagicMock(side_effect=Retry) 
def test_task(self): 
    with assert_raises(Retry): 
     task() # Note, no delay or things like that 

# and the task, I don't know if it works without bind. 
@Celery.task(bind=True) 
def task(self): 
    raise self.retry() 

如果有人知道我怎麼能在嘲諷重試「異常」擺脫額外的步驟,我很樂意聽到它!

1
from mock import patch 

import pytest  


@patch('tasks.convert_video.retry') 
@patch('tasks.VideoManager') 
def test_retry_on_exception(mock_video_manger, mock_retry): 
    mock_video_manger.convert.side_effect = error = Exception() 
    with pytest.raises(Exception): 
     tasks.convert_video('foo', 'bar') 
    mock_retry.assert_called_with(exc=error) 

你還缺少你的任務的一些東西:

@celery.task(bind=False, default_retry_delay=30) 
def convert_video(gif_url, webhook): 
    try: 
     return VideoManager().convert(gif_url) 
    except Exception as exc: 
     convert_video.retry(exc=exc)