2013-02-19 53 views
11

我有一個類似於here概述的情況,除了不是用多個參數鏈接任務之外,我想鏈接返回具有多個條目的詞典的任務。芹菜任務鏈和訪問** kwargs

這是 - 非常鬆散和抽象---我想要做的事:

tasks.py

@task() 
def task1(item1=None, item2=None): 
    item3 = #do some stuff with item1 and item2 to yield item3 
    return_object = dict(item1=item1, item2=item2, item3=item3) 
    return return_object 

def task2(item1=None, item2=None, item3=None): 
    item4 = #do something with item1, item2, item3 to yield item4 
    return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4) 
    return return_object 

從IPython中工作,我能夠獨立和異步調用TASK1 ,沒有問題。

我也可以單獨調用TASK2與TASK1作爲雙星參數返回的結果:

>>res1 = task1.s(item1=something, item2=something_else).apply_async() 
>>res1.status 
'SUCCESS' 
>>res2 = task2.s(**res1.result).apply_async() 
>>res2.status 
'SUCCESS 

不過,我最終要實現的是同樣的最終結果同上,但通過鏈條,在這裏,我無法弄清楚如何有TASK2與(位置)參數通過TASK1返回未實例化,但與task1.result爲** kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK! 

我懷疑,我可以回去並重寫我的任務,以便t嘿返回位置參數,而不是一個字典,這可能會清除事情,但在我看來,應該有一些方法來訪問任務2中的task1的返回對象與**雙星的等效功能。我也懷疑我在這裏錯過了一些關於Celery子任務實現或* args vs. ** kwargs的相當明顯的東西。

希望這是有道理的。並提前感謝任何提示。

回答

1

chain和其他畫布原語位於 功能實用程序系列中,如mapreduce

E.g.其中map(target, items)爲列表中的每個項目調用target(item), Python有一個叫做itertools.starmap, 的映射的很少使用的版本,它改爲調用target(*item)

雖然我們可以將starchain甚至​​添加到工具箱,但這些 將會非常專業化,可能不會經常使用。

有趣的是,Python使這些列表和發生器表達式 變得沒有必要,因此map被[target(item) for item in item]和starmap替換爲[target(*item) for item in item]

因此,我們不應該爲每個基元實施幾種替代方案,而應該着重於尋找一種更靈活的方式來支持這種方法。像是有芹菜動力的發電機表達式(如果可能的話,如果沒有類似的功能)

+0

明白了。謝謝。我最終通過稍微改變輸入/返回到我的任務來解決這個問題。 T2現在只是查找一個單詞對象作爲輸入,然後從該詞典中檢索期望的k /值對以執行該任務。 – 2013-02-21 14:50:11

+0

@BenjaminWhite我還是不明白。你能告訴我你是如何做到這一點的 – ashim888 2016-04-01 12:56:26

1

由於這不是內置到芹菜,我寫了一個裝飾器功能類似於我自己。

# Use this wrapper with functions in chains that return a tuple. The 
# next function in the chain will get called with that the contents of 
# tuple as (first) positional args, rather than just as just the first 
# arg. Note that both the sending and receiving function must have 
# this wrapper, which goes between the @task decorator and the 
# function definition. This wrapper should not otherwise interfere 
# when these conditions are not met. 

class UnwrapMe(object): 
    def __init__(self, contents): 
     self.contents = contents 

    def __call__(self): 
     return self.contents 

def wrap_for_chain(f): 
    """ Too much deep magic. """ 
    @functools.wraps(f) 
    def _wrapper(*args, **kwargs): 
     if type(args[0]) == UnwrapMe: 
      args = list(args[0]()) + list(args[1:]) 
     result = f(*args, **kwargs) 

     if type(result) == tuple and current_task.request.callbacks: 
      return UnwrapMe(result) 
     else: 
      return result 
    return _wrapper 

礦解開像starchain概念,但你可以很容易地修改它解開kwargs來代替。

5

這是我拿的問題,用一個抽象的任務類:

from __future__ import absolute_import 
from celery import Task 
from myapp.tasks.celery import app 


class ChainedTask(Task): 
    abstract = True  

    def __call__(self, *args, **kwargs): 
     if len(args) == 1 and isinstance(args[0], dict): 
      kwargs.update(args[0]) 
      args =() 
     return super(ChainedTask, self).__call__(*args, **kwargs) 

@app.task(base=ChainedTask) 
def task1(x, y): 
    return {'x': x * 2, 'y': y * 2, 'z': x * y}  


@app.task(base=ChainedTask) 
def task2(x, y, z): 
    return {'x': x * 3, 'y': y * 3, 'z': z * 2} 

現在,您可以定義和執行你的鏈,例如:

from celery import chain 

pipe = chain(task1.s(x=1, y=2) | task2.s()) 
pipe.apply_async()