2017-01-30 192 views
4

我有一個API,它返回其他API的列表。執行其他芹菜任務不工作的芹菜週期性任務

我需要訪問這些API每隔15分鐘,並把返回到數據庫中的數據。

下面是我用芹菜和Redis的在celery_worker.py文件中寫道。但是所有的任務都沒有開始。

list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json() 

CELERYBEAT_SCHEDULE = { 
    'every-15-minute': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': timedelta(minutes=15), 
    }, 
} 

@celery.task 
def access_one_API(one_API): 
    return requests.get(one_API).json() 

@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(): 
    for one_API in list_of_APIs: 
      task = access_one_API.delay(one_API) 
      # some codes to put all task.id into a list_of_task_id 

    for task_id in list_of_task_id: 
      # some codes to get the results of all tasks 
      # some codes to put all the results into a database 

fetch_data_of_all_APIs功能應每15分鐘,這是應該使用多個工人跑access_one_API功能

芹菜服務器啓動成功的終端但既不fetch_data_of_all_APIs也不access_one_API開始運行。

如果我提取fetch_data_of_all_APIs函數中的代碼,access_one_API可以啓動並由多個芹菜工作人員執行。但只要我將這些代碼放在一個函數中並用@celery.task來修飾它,那麼這兩個函數都不會啓動。

所以我相信它一定與芹菜有關。

非常感謝提前。

+0

請注意,您需要'@ celery.task()'裝飾器。另外,您需要檢查'celery-beat'配置參數,因爲當前的芹菜版本使用小寫設置。 –

回答

0

這裏例如如何配置週期性任務與子任務芹菜(我設置20秒示範)。 tasks.py:

import celery 
from celery.canvas import subtask 
from celery.result import AsyncResult 
# just for example list of integer values 
list_of_APIs = [1, 2, 3, 4] 


@celery.task(name='access_one_API') 
def access_one_API(api): 
    """ 
    Sum of subtask for demonstration 
    :param int api: 
    :return: int 
    """ 
    return api + api 


@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(list_of_APIs): 
    list_task_ids = [] 

    for api in list_of_APIs: 
     # run of celery subtask and collect id's of subtasks 
     task_id = subtask('access_one_API', args=(api,)).apply_async().id 
     list_task_ids.append(task_id) 

    result_sub_tasks = {} 

    for task_id in list_task_ids: 
     while True: 
      task_result = AsyncResult(task_id) 
      if task_result.status == 'SUCCESS': 
       # if subtask is finish add result and check result of next subtask 
       result_sub_tasks[task_id] = task_result.result 

       break 

    print result_sub_tasks 
    # do something with results of subtasks here... 


app = celery.Celery(
    'tasks', 
    broker='redis://localhost:6379/0', 
    backend='redis://localhost:6379/0', 
) 


app.conf.beat_schedule = { 
    'add-every-20-seconds': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': 20.0, 
     # args for fetch_data_of_all_APIs 
     'args': (list_of_APIs,) 
    }, 
} 

運行芹菜:從終端celery worker -A tasks.app --loglevel=info --beat

跟蹤:

[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2} 

希望這有助於。