我正在嘗試實現一個相當簡單的Celery工作流,其中我接收到與元組(或列表)相同任務的多個並行調用的結果。收集來自並行Celery任務執行的結果
@app.task
def add(x, y):
return x + y
@app.task
def master():
return group(add.s(1, 2), add.s(3, 4))()
由此,我想在一個通用的方法檢索(3, 7)
,即,在不依賴於工作流本身的一種方式。我正在尋找某種「將異步結果圖減少到原語」操作。我已經嘗試用下面的(我已經取代結果的ID與#num
爲簡潔起見)
r = master.delay()
r.get() # <GroupResult: #1 [#2, #3]>
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
# (<GroupResult: #1 [#2, #3]>, [3, 7])
# (<AsyncResult: #2>, 3),
# (<GroupResult: #3>, 7)]
r.get()
返回圍繞兩個AsyncResult
標識的包裝,所以我將不得不遞歸處理每一個。 r.collect()
已接近,但它遞歸太深。
我可以做類似
r.children[0].get()
但這不是通用的,因爲它明確地依賴於結果圖的結構。另外,我可以遍歷r.collect()
,直到我找到一個元組,其價值不是ResultBase
一個實例,像
next(value for _, value in r.collect() if not isinstance(value, ResultBase))
,但我不知道這是在所有情況下實際上是正確的,我希望有一個更優雅的方式來做到這一點。
如果有一種重構master
任務的方式來讓檢索結果更容易,我會對它開放,只要子任務並行啓動即可。任何建議,將不勝感激。先謝謝你。
編輯一個相關的問題是,如果我想打電話r.get()
或r.collect()
之前檢索在非阻塞方式任務結果(例如,通過手動查詢r.status
,我不能簡單地做到這一點
r = master.delay()
# some time later...
if r.status in READY_STATES:
r.get()
因爲r
是AsyncResult
其解析爲GroupResult
,即它的GroupResult
或其孩子之前完成。有沒有辦法來調用組的方式那「跳過」頂級AsyncResult
?這將解決這兩個問題,因爲r.status
和r.get()
將分別反映子任務的狀態和值。
我正面臨着類似的挑戰,您目前遇到了很多麻煩。你能分享一大部分代碼嗎?你在哪裏存儲'r.collect()',是否阻塞?你有'for'循環嗎?一個'while'循環? – zerohedge
這樣做是爲了在小組結束後還是在一組工作正在進行時收集結果? – zerohedge