2015-09-10 46 views
1

我試圖解決芹菜一個問題:使用.replace方法

  • 我有一個查詢爲ID的API,然後一個任務開始的子任務爲每個這些。
  • 我不知道,提前,什麼是ID,或有多少。
  • 對於每個id,我會經歷一個大的計算,然後將一些數據轉儲到數據庫中。
  • 之後全部子任務已完成,我想運行彙總功能(將DB結果導出爲Excel格式)。
  • 理想情況下,我並不想阻止我的主要工作人員查詢的子任務的狀態

這個問題看起來很相似(芹菜,如果你試試這個生氣。)(如果不相同?): Celery: Callback after task hierarchy

因此,使用「解決方案」(這是this discussion一個鏈接,我嘗試以下測試腳本:

# test.py 
from celery import Celery, chord 
from celery.utils.log import get_task_logger 

app = Celery('test', backend='redis://localhost:45000/10?new_join=1', broker='redis://localhost:45000/11') 
app.conf.CELERY_ALWAYS_EAGER = False 

logger = get_task_logger(__name__) 


@app.task(bind=True) 
def get_one(self): 
    print('hello world') 
    self.replace(get_two.s()) 
    return 1 


@app.task 
def get_two(): 
    print('Returning two') 
    return 2 


@app.task 
def sum_all(data): 
    print('Logging data') 
    logger.error(data) 
    return sum(data) 


if __name__ == '__main__': 
    print('Running test') 
    x = chord(get_one.s() for i in range(3)) 
    body = sum_all.s() 
    result = x(body) 

    print(result.get()) 
    print('Finished w/ test') 

它不爲我工作,我得到一個錯誤:

AttributeError: 'get_one' object has no attribute 'replace'

注意,我已經new_join在我的後端URL = 1,雖然不是經紀人。如果我把它放在那裏,我得到一個錯誤:

TypeError: _init_params() got an unexpected keyword argument 'new_join'

我在做什麼錯?我使用Python 3.4.3及以下軟件包:

amqp==1.4.6 
anyjson==0.3.3 
billiard==3.3.0.20 
celery==3.1.18 
kombu==3.0.26 
pytz==2015.4 
redis==2.10.3 

回答