2016-11-09 16 views
0

我試圖實現的內容 編寫一個調度程序,它使用數據庫在不同的時間安排類似的任務。使用芹菜節拍在多個時間點(使用不同參數)調度任務

,因爲我是用芹菜節拍相同,下面的代碼片段會給一個想法

try: 
    reader = MongoReader() 
except: 
    raise 
try: 
    tasks = reader.get_scheduled_tasks() 
except: 
    raise 
celerybeat_schedule = dict() 
for task in tasks: 
    celerybeat_schedule[task["task_id"]] =dict() 
    celerybeat_schedule[task["task_id"]]["task"] = task["task_name"] 
    celerybeat_schedule[task["task_id"]]["args"] = (task,) 
    celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task) 

app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule) 

所以這些都是三個步驟 - 創建一個字典,芹菜調度是 - 從數據存儲 閱讀所有任務通過具有特性,TASK_NAME(將運行方法),參數的所有任務填充(數據傳遞給該方法),(當運行存儲)時間表 - 芹菜配置

預期場景 給出的所有ENTR更新此IES運行,只是打印,具有相同的時間表來運行每5分鐘,具有不同的參數指定要打印的內容相同芹菜任務名稱,可以說DB有

task name  , parameter , schedule 
regular_print , Hi  , {"minutes" : 5} 
regular_print , Hello  , {"minutes" : 5} 
regular_print , Bye  , {"minutes" : 5} 

我希望,這些將每5分鐘打印打印所有三個

發生 只有喜的一個什麼,你好,再見打印(可能隨機,肯定不是按順序)

請幫幫忙, 感謝很多提前:)

回答

0

w ^如能使用芹菜4版本解決這個問題。樣品類似於我工作..也可以進行版本找到文檔由芹菜4

#taking address and user-pass from environment(you can mention direct values) 
    ex_host_queue = os.environ["EX_HOST_QUEUE"] 
    ex_port_queue = os.environ["EX_PORT_QUEUE"] 
    ex_user_queue = os.environ["EX_USERID_QUEUE"] 
    ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"] 
    broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//" 

    #celery initialization 
    app = Celery(__name__,backend=broker, broker=broker) 
    app.conf.task_default_queue = 'scheduler_queue' 
    app.conf.update(
     task_serializer='json', 
     accept_content=['json'], # Ignore other content 
     result_serializer='json' 
    ) 
task = {"task_id":1,"a":10,"b":20} 
##method to update scheduler 
def add_scheduled_task(task): 
    print("scheduling task") 
    del task["_id"] 
    print("adding task_id") 
    name = task["task_name"] 
    app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])  

@app.task(name='scheduler_task') 
def scheduler_task(data): 
    print(str(data["a"]+data["b"]))