如果條件不符合,我想顯式地使任務失敗。例如: @app.task(bind=True, name="task.my_task", max_retries=2)
def my_task(self, filename):
result = get_result(filename)
if result is None:
self.update_state(task_id
我知道這將被視爲重複,但我有環顧四周問這個問題之前,但所有的問題似乎是過期或不符合我的問題在所有幫助。這是我寫這個問題之前看了: How do you unit test a Celery task?(5歲,全死鏈接) How to unit test code that runs celery tasks?(2歲) How do I capture Celery tasks during
我正在嘗試對每日運行的芹菜任務運行單元測試。 我試過導入函數並在我的測試中調用它,但這不起作用。 的任務是: @shared_task
def create_a_notification_if_a_product_is_in_or_out_of_season():
"""
Send a notification if a product is now in or out o