2015-06-10 17 views
6

Próbuję sprawdzić, czy aplikacja próbuje ponownie.Jak przetestować ponów w aplikacji do selera w Pythonie?

@celery.task(bind=False, default_retry_delay=30) 
def convert_video(gif_url, webhook): 
    // doing something 
    VideoManager().convert(gif_url) 
    return 
    except Exception as exc: 
     raise convert_video.retry(exc=exc) 

A ja szyderczy test

@patch('src.video_manager.VideoManager.convert') 
@patch('requests.post') 
def test_retry_failed_task(self, mock_video_manager, mock_requests): 
    mock_video_manager.return_value= {'webm':'file.webm', 'mp4':'file.mp4', 'ogv' : 'file.ogv', 'snapshot':'snapshot.png'} 
    mock_video_manager.side_effect = Exception('some error') 

    server.convert_video.retry = MagicMock() 

    server.convert_video('gif_url', 'http://www.company.com/webhook?attachment_id=1234') 
    server.convert_video.retry.assert_called_with(ANY) 

i otrzymuję ten błąd

TypeError: exceptions must be old-style classes or derived from BaseException, not MagicMock

co jest oczywiste, ale nie wiem jak to zrobić w inny sposób sprawdzić, czy metoda jest wywoływana.

Odpowiedz

3

Nie udało mi się tego zrobić, korzystając z wbudowanej metody ponawiania prób, więc muszę zastosować próbę z efektem ubocznym rzeczywistej ponownej próby, dzięki czemu można ją złapać w teście. Zrobiłem to tak:

from celery.exceptions import Retry 
from mock import MagicMock 
from nose.plugins.attrib import attr 

# Set it for for every task-call (or per task below with @patch) 
task.retry = MagicMock(side_effect=Retry) 

#@patch('task.retry', MagicMock(side_effect=Retry) 
def test_task(self): 
    with assert_raises(Retry): 
     task() # Note, no delay or things like that 

# and the task, I don't know if it works without bind. 
@Celery.task(bind=True) 
def task(self): 
    raise self.retry() 

Jeśli ktoś wie jak mogę pozbyć się dodatkowego etapu w szydząc z „wyjątek” Ponów byłbym szczęśliwy to słyszeć!

1
from mock import patch 

import pytest  


@patch('tasks.convert_video.retry') 
@patch('tasks.VideoManager') 
def test_retry_on_exception(mock_video_manger, mock_retry): 
    mock_video_manger.convert.side_effect = error = Exception() 
    with pytest.raises(Exception): 
     tasks.convert_video('foo', 'bar') 
    mock_retry.assert_called_with(exc=error) 

jesteś również brakuje niektórych rzeczy w swoim zadaniu:

@celery.task(bind=False, default_retry_delay=30) 
def convert_video(gif_url, webhook): 
    try: 
     return VideoManager().convert(gif_url) 
    except Exception as exc: 
     convert_video.retry(exc=exc) 
Powiązane problemy