1
我有一個芹菜應用程序正在使用後端來存儲已完成任務的結果。從芹菜後端獲取所有任務的列表
當任務排隊/運行時,我可以獲取有關它們的信息,但是在完成後,如何從結果後端獲取所有任務ID的列表?我想在python應用程序中以不依賴於特定後端的方式(例如,我可能想在將來作爲結果存儲區在文件系統和MySQL之間切換)執行此操作。
我有一個芹菜應用程序正在使用後端來存儲已完成任務的結果。從芹菜後端獲取所有任務的列表
當任務排隊/運行時,我可以獲取有關它們的信息,但是在完成後,如何從結果後端獲取所有任務ID的列表?我想在python應用程序中以不依賴於特定後端的方式(例如,我可能想在將來作爲結果存儲區在文件系統和MySQL之間切換)執行此操作。
我認爲最好的解決方案是存儲有關在db中完成的任務的信息。首先它會很容易處理。 這裏只是SQLite
的一個例子。表我們的任務:
# You can add specific columns for args, kwargs etc.
# it is just an example
CREATE TABLE celery_tasks (
"id" INTEGER PRIMARY KEY,
"task_id" TEXT NOT NULL,
"task_name" TEXT NOT NULL,
"state" TEXT NOT NULL,
"created" TEXT NOT NULL
)
我們芹菜應用tasks.py
:
import celery
from celery.signals import task_postrun
from celery.task import Task
import sqlite3
from datetime import datetime
@task_postrun.connect()
def task_postrun(signal=None, sender=None, task_id=None, task=None,
args=None, kwargs=None, retval=None, state=None):
# For example we don't want to store info about specific tasks
ignored_tasks = ('tasks.ignore_task',)
if task.name not in ignored_tasks:
# write info about a finished task into SQLite
conn = sqlite3.connect('db')
conn.execute(
'INSERT INTO celery_tasks (task_id, task_name, state, created) VALUES (?,?,?,?)',
(task_id, task.name, state, datetime.now())
)
conn.commit()
conn.close()
app = celery.Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
)
@app.task
def success_task():
pass
@app.task
def failure_task():
raise Exception('something wrong')
@app.task
def ignore_task():
"""
Example of the task that we want to ignore for SQLite.
"""
pass
run_tasks.py
:
from tasks import success_task, failure_task, ignore_task
success_task.delay()
failure_task.delay()
ignore_task.delay()
所以,在這之後,你可以使用常規的SQL查詢來獲取完成的任務的任何信息在代碼的任何地方(SELECT * FROM celery_tasks WHERE created ... AND ...
)
也可以不時清理表格。 我認爲使用db是一個很好的解決方案。
你只需要「成功」任務或全部?你能解釋一下它是什麼嗎?爲什麼你不能使用[芹菜花](http://flower.readthedocs.io/en/latest/)? –
@DanilaGanchar理想的所有任務。我有一些生成報告的任務。在每週結束時,我想針對一週內的所有生成的報告運行一次總體報告(即獲取成功/失敗以及本週運行的所有任務的結果)。我目前正在使用文件系統後端來獲取結果,所以不需要運行一個額外的webapp(芹菜花)來列出/解析目錄的內容。這是我過去對其他任務排隊系統所做的事情,所以想要檢查一下我在Celery中是否缺少一種方法。 –