2015-12-14 91 views
3

我正在嘗試實現一個相當簡單的Celery工作流,​​其中我接收到與元組(或列表)相同任務的多個並行調用的結果。收集來自並行Celery任務執行的結果

@app.task 
def add(x, y): 
    return x + y 

@app.task 
def master(): 
    return group(add.s(1, 2), add.s(3, 4))() 

由此,我想在一個通用的方法檢索(3, 7),即,在不依賴於工作流本身的一種方式。我正在尋找某種「將異步結果圖減少到原語」操作。我已經嘗試用下面的(我已經取代結果的ID與#num爲簡潔起見)

r = master.delay() 
r.get()  # <GroupResult: #1 [#2, #3]> 
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>), 
      # (<GroupResult: #1 [#2, #3]>, [3, 7]) 
      # (<AsyncResult: #2>, 3), 
      # (<GroupResult: #3>, 7)] 

r.get()返回圍繞兩個AsyncResult標識的包裝,所以我將不得不遞歸處理每一個。 r.collect()已接近,但它遞歸太深。

我可以做類似

r.children[0].get() 

但這不是通用的,因爲它明確地依賴於結果圖的結構。另外,我可以遍歷r.collect(),直到我找到一個元組,其價值不是ResultBase一個實例,像

next(value for _, value in r.collect() if not isinstance(value, ResultBase)) 

,但我不知道這是在所有情況下實際上是正確的,我希望有一個更優雅的方式來做到這一點。

如果有一種重構master任務的方式來讓檢索結果更容易,我會對它開放,只要子任務並行啓動即可。任何建議,將不勝感激。先謝謝你。


編輯一個相關的問題是,如果我想打電話r.get()r.collect()之前檢索在非阻塞方式任務結果(例如,通過手動查詢r.status,我不能簡單地做到這一點

r = master.delay() 

# some time later... 
if r.status in READY_STATES: 
    r.get() 

因爲rAsyncResult其解析爲GroupResult,即它的GroupResult或其孩子之前完成。有沒有辦法來調用組的方式那「跳過」頂級AsyncResult?這將解決這兩個問題,因爲r.statusr.get()將分別反映子任務的狀態和值。

回答

1

當然,正確的解決方案是最簡單的解決方案:調用master作爲在當前進程中執行它的函數。

r = master() 
r.get()  # [3, 7] 
r.collect() # [(<GroupResult: #1 [#2, #3]>, [3, 7]), 
      # (<AsyncResult: #2>, 3), 
      # (<AsyncResult: #3>, 7)] 

而是推遲group啓動代碼的工作進程,它在目前的進程啓動。由於group完全是異步的,因此行爲不會改變,性能會提高。

+0

我正面臨着類似的挑戰,您目前遇到了很多麻煩。你能分享一大部分代碼嗎?你在哪裏存儲'r.collect()',是否阻塞?你有'for'循環嗎?一個'while'循環? – zerohedge

+0

這樣做是爲了在小組結束後還是在一組工作正在進行時收集結果? – zerohedge