2014-02-10 38 views
2

我想將客戶端創建的函數傳遞給芹菜任務並將其執行。例如下面,我試圖編寫一個map函數,該函數在芹菜任務中使用函數f和列表l並執行map(f, l)將客戶端函數傳遞給芹菜任務

推測,該函數沒有正確序列化(可以理解,這很難)。但是,有沒有辦法做到這一點?最佳做法是什麼?我想我可以傳遞一個字符串然後exec它,但我寧願不只是我的應用程序的工作方式。

編輯:我找到了一種方式serialize a function ...我想我可以把它包起來做我需要做的事情。任何更好的想法?


from celery import Celery 

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost') 

@app.task 
def cp_map(f, l): 
    return map(f, l) 

然後,我嘗試使用此任務與此:

In [20]: from tasks import cp_map 

In [21]: def f(x): return x + 1 

In [22]: cp_map.delay(f, [1,2,3]) 
Out[22]: <AsyncResult: 27baf8bf-8ef3-496d-a445-ebd7ee94e206> 

In [23]: _.status 
Out[23]: 'PENDING' 

在工人,我得到這個:

[2014-02-09 22:27:00,828: CRITICAL/MainProcess] Can't decode message body: DecodeError(AttributeError("'module' object has no attribute 'f'",),) (type:u'application/x-python-serialize' encoding:u'binary' raw:"'\\x80\\x02}q\\x01(U\\x07expiresq\\x02NU\\x03utcq\\x03\\x88U\\x04argsq\\x04c__main__\\nf\\nq\\x05]q\\x06(K\\x01K\\x02K\\x03e\\x86q\\x07U\\x05chordq\\x08NU\\tcallbacksq\\tNU\\x08errbacksq\\nNU\\x07tasksetq\\x0bNU\\x02idq\\x0cU$27baf8bf-8ef3-496d-a445-ebd7ee94e206q\\rU\\x07retriesq\\x0eK\\x00U\\x04taskq\\x0fU\\x0ctasks.cp_mapq\\x10U\\ttimelimitq\\x11NN\\x86U\\x03etaq\\x12NU\\x06kwargsq\\x13}q\\x14u.' (233b)"') 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 585, in _receive_callback 
    decoded = None if on_m else message.decode() 
    File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 142, in decode 
    self.content_encoding, accept=self.accept) 
    File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads 
    return decode(data) 
    File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 59, in _reraise_errors 
    reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 
    File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 55, in _reraise_errors 
    yield 
    File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads 
    return decode(data) 
    File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 64, in pickle_loads 
    return load(BytesIO(s)) 
DecodeError: 'module' object has no attribute 'f' 

回答

4

您可以使用元帥序列化函數轉換爲字符串,然後在任務中對其進行反序列化。我不知道這是否是最好的方式,但它會起作用。你也可以看看dill

下面是一些例子代碼another stackoverflow answer複製:

import marshal 
def foo(x): return x*x 
code_string = marshal.dumps(foo.func_code) 

然後在任務:如果你引用的功能坐落在一個文件

import marshal, types 

code = marshal.loads(code_string) 
func = types.FunctionType(code, globals(), "some_func_name") 

func(10) # gives 100 
+0

這就是我最終做的。謝謝! –

+0

PyPI上的'雲'庫包含用於序列化本地函數的實用程序,據報道它工作得很好。 – asksol

0

也會發生這種類型的錯誤__main__指令,即:包含函數定義的文件如下所示:

def f(*args): 
    ... some code here ... 

if __name__ == "__main__": 
    ... some code here ... 

如果是這種情況,將函數定義放在與「__main__」引用代碼分開的文件中應解決該問題。

假設這個簡單的重構確實適用於你的用例,它比上面的元帥jiujitsu簡單得多。

1

有時您可能會遇到編碼複雜函數的問題。您可以在編碼過程中忽略或替換錯誤。

__ENCODING_FORMAT = 'ISO-8859-1' # ISO-8859-1 is a work-around to avoid UnicodeDecodeError on 'byte 0x83' 

def _serialize_func(func): 
    """ converts func into code string and encodes it with ISO-8859-1 """ 
    return unicode(marshal.dumps(func.func_code), encoding=__ENCODING_FORMAT, errors='replace') 


def _load_func(code_string): 
    """ loads func from code string, decodes from unicode to str""" 
    code = marshal.loads(code_string.encode(__ENCODING_FORMAT)) 
    return types.FunctionType(code, globals())