2017-07-14 39 views
2

我正在嘗試對每日運行的芹菜任務運行單元測試。
我試過導入函數並在我的測試中調用它,但這不起作用。是否可以對芹菜週期性任務運行單元測試?

的任務是:

@shared_task 

def create_a_notification_if_a_product_is_in_or_out_of_season(): 
    """ 
    Send a notification if a product is now in or out of season 
    """ 
    julian_date = date.today().timetuple().tm_yday + 1 
    active_products = Product.objects.filter(status='ACTIVE') 

    for products in active_products: 
     in_season_prd = ProductDescription.objects.filter(
      product=products, 
      early_start_julian=julian_date 
     ) 
     for prd in in_season_prd: 
      notification = Notification() 
      notification.type = notification_choices.PRODUCT_IN_SEASON 
      notification.description = str(prd.product.name) + " will be in season from tomorrow." 
      notification.save() 

,這裏是我的測試中的一個例子:

def test_when_product_is_about_to_come_in_to_seasonality(self): 
    """ 
    Make a notification when a product is due to come in to seasonality tomorrow 
    """ 
    p = Product.objects.first() 
    p.status = "ACTIVE" 
    today = date.today().timetuple().tm_yday 
    p.early_start_julian = today + 1 
    create_a_notification_if_a_product_is_in_or_out_of_season() 
    updated_notifications = Notification.objects.all().count() 
    self.assertNotEqual(self.current_notifications, updated_notifications) 

任何幫助,將不勝感激!

感謝

回答

0

您可以apply()你的芹菜任務同步執行它:

def test_when_product_is_about_to_come_in_to_seasonality(self): 
    """ 
    Make a notification when a product is due to come in to seasonality tomorrow 
    """ 
    p = Product.objects.first() 
    p.status = "ACTIVE" 
    today = date.today().timetuple().tm_yday 
    p.early_start_julian = today + 1 
    create_a_notification_if_a_product_is_in_or_out_of_season.apply() 
    updated_notifications = Notification.objects.all().count() 
    self.assertNotEqual(self.current_notifications, updated_notifications) 
+0

我試過這個,我有相同的結果,任務不運行應該創建一個通知,因此當我比較自我。 current_notifications與updated_notification他們仍然相等,並且測試失敗 –

+0

我意識到,這仍然是失敗的原因是由於我建立了工廠,它創建了產品來運行測試。將.apply()添加到導入任務的末尾仍然有效,但任務正在觸發。非常感謝! –

+0

高興地幫忙@BenCurrie :) –

0

我認爲你正在尋找CELERY_ALWAYS_EAGER設置。如果設置爲True它將同步運行您的任務。你可以在你的測試設置中設置它,或者你可以裝飾只有該測試@override_settings(CELERY_ALWAYS_EAGER=True)

+0

我剛試過這個,但它似乎仍然沒有運行任務,因爲沒有通知被創建,並且測試失敗。不過謝謝 –