我試圖解決芹菜一個問題:使用.replace方法
- 我有一個查詢爲ID的API,然後一個任務開始的子任務爲每個這些。
- 我不知道,提前,什麼是ID,或有多少。
- 對於每個id,我會經歷一個大的計算,然後將一些數據轉儲到數據庫中。
- 之後全部子任務已完成,我想運行彙總功能(將DB結果導出爲Excel格式)。
- 理想情況下,我並不想阻止我的主要工作人員查詢的子任務的狀態
這個問題看起來很相似(芹菜,如果你試試這個生氣。)(如果不相同?): Celery: Callback after task hierarchy
因此,使用「解決方案」(這是this discussion一個鏈接,我嘗試以下測試腳本:
# test.py
from celery import Celery, chord
from celery.utils.log import get_task_logger
app = Celery('test', backend='redis://localhost:45000/10?new_join=1', broker='redis://localhost:45000/11')
app.conf.CELERY_ALWAYS_EAGER = False
logger = get_task_logger(__name__)
@app.task(bind=True)
def get_one(self):
print('hello world')
self.replace(get_two.s())
return 1
@app.task
def get_two():
print('Returning two')
return 2
@app.task
def sum_all(data):
print('Logging data')
logger.error(data)
return sum(data)
if __name__ == '__main__':
print('Running test')
x = chord(get_one.s() for i in range(3))
body = sum_all.s()
result = x(body)
print(result.get())
print('Finished w/ test')
它不爲我工作,我得到一個錯誤:
AttributeError: 'get_one' object has no attribute 'replace'
注意,我做已經new_join在我的後端URL = 1,雖然不是經紀人。如果我把它放在那裏,我得到一個錯誤:
TypeError: _init_params() got an unexpected keyword argument 'new_join'
我在做什麼錯?我使用Python 3.4.3及以下軟件包:
amqp==1.4.6
anyjson==0.3.3
billiard==3.3.0.20
celery==3.1.18
kombu==3.0.26
pytz==2015.4
redis==2.10.3
好的。謝謝。是的,我已經閱讀了更新日誌,因此認爲它是在3.1版本中。我也通過源搜索並認爲它在那裏。 – Symmitchry