2017-03-16 81 views
1

我有一個芹菜應用程序正在使用後端來存儲已完成任務的結果。從芹菜後端獲取所有任務的列表

當任務排隊/運行時,我可以獲取有關它們的信息,但是在完成後,如何從結果後端獲取所有任務ID的列表?我想在python應用程序中以不依賴於特定後端的方式(例如,我可能想在將來作爲結果存儲區在文件系統和MySQL之間切換)執行此操作。

+0

你只需要「成功」任務或全部?你能解釋一下它是什麼嗎?爲什麼你不能使用[芹菜花](http://flower.readthedocs.io/en/latest/)? –

+1

@DanilaGanchar理想的所有任務。我有一些生成報告的任務。在每週結束時,我想針對一週內的所有生成的報告運行一次總體報告(即獲取成功/失敗以及本週運行的所有任務的結果)。我目前正在使用文件系統後端來獲取結果,所以不需要運行一個額外的webapp(芹菜花)來列出/解析目錄的內容。這是我過去對其他任務排隊系統所做的事情,所以想要檢查一下我在Celery中是否缺少一種方法。 –

回答

1

我認爲最好的解決方案是存儲有關在db中完成的任務的信息。首先它會很容易處理。 這裏只是SQLite的一個例子。表我們的任務:

# You can add specific columns for args, kwargs etc. 
# it is just an example 
CREATE TABLE celery_tasks (
    "id" INTEGER PRIMARY KEY, 
    "task_id" TEXT NOT NULL, 
    "task_name" TEXT NOT NULL, 
    "state" TEXT NOT NULL, 
    "created" TEXT NOT NULL 
) 

我們芹菜應用tasks.py

import celery 
from celery.signals import task_postrun 
from celery.task import Task 
import sqlite3 
from datetime import datetime 


@task_postrun.connect() 
def task_postrun(signal=None, sender=None, task_id=None, task=None, 
       args=None, kwargs=None, retval=None, state=None): 
    # For example we don't want to store info about specific tasks 
    ignored_tasks = ('tasks.ignore_task',) 

    if task.name not in ignored_tasks: 
     # write info about a finished task into SQLite 
     conn = sqlite3.connect('db') 
     conn.execute(
      'INSERT INTO celery_tasks (task_id, task_name, state, created) VALUES (?,?,?,?)', 
      (task_id, task.name, state, datetime.now()) 
     ) 

     conn.commit() 
     conn.close() 


app = celery.Celery(
    'tasks', 
    broker='redis://localhost:6379/0', 
    backend='redis://localhost:6379/0', 
) 


@app.task 
def success_task(): 
    pass 


@app.task 
def failure_task(): 
    raise Exception('something wrong') 


@app.task 
def ignore_task(): 
    """ 
    Example of the task that we want to ignore for SQLite. 
    """ 
    pass 

run_tasks.py

from tasks import success_task, failure_task, ignore_task 

success_task.delay() 
failure_task.delay() 
ignore_task.delay() 

所以,在這之後,你可以使用常規的SQL查詢來獲取完成的任務的任何信息在代碼的任何地方(SELECT * FROM celery_tasks WHERE created ... AND ...

也可以不時清理表格。 我認爲使用db是一個很好的解決方案。