2015-07-19 152 views
0

下面的代碼片段發起任務,我芹菜安裝啓動:芹菜和丟失消息

tasks.py

@app.task(ignore_result=False) 
def asyncTransactionTask(txid): 
    Here I do something with txid and do not schedule additional tasks 

@app.task(ignore_result=True) 
def asyncCheckNotifications(*args): 

    try: 
     payments = # get an array of values 
     payments_tasks = [] 
     for payment in payments: 
      payments_tasks.append(asyncTransactionTask.s(payment)) 

     chain(group(payments_tasks) | asyncCheckNotifications.subtask()).apply_async(countdown=60) 
    except Exception as e: 
     logger.error(str(e)) 
     asyncCheckNotifications.apply_async(countdown=10) 
     raise e 

asyncCheckNotifications.delay() 

我期望看到的asyncCheckNotifications方法運行大約每分鐘,而我收到他們每兩個分鐘。更重要的是,如果我檢查計劃的任務(celery -A myapp inspect scheduled),我會看到方法執行被適當調度,但是當我到達超時時間時,它會被下一分鐘的另一個時間表替換,並且不會執行任何操作。

我正在使用芹菜3.1.8。 消息代理是RabbitMQ 3.2.4。

回答

0

同時,我已經更換了以下解決我的問題:與

chain(group(payments_tasks) | asyncCheckNotifications.subtask()).apply_async(countdown=60) 

如下:

chain(group(payments_tasks) | asyncCheckNotifications.subtask(countdown=60)).delay()