2016-11-26 48 views
0

我有一個運行芹菜服務器使用redis作爲borker和結果存儲(python3)。我想有一個任意的函數,它沒有被註冊到服務器,由芹菜工作人員執行。我嘗試使用包marshal(以下Is there an easy way to pickle a python function (or otherwise serialize its code)?)序列化此功能轉移的字節碼的工人:由芹菜工作人員執行任意函數

celery_server.py:

from celery import Celery 
import types 
import marshal 

app = Celery('tasks', broker='redis://[email protected]//', backend='redis://localhost') 

@app.task 
def run_fct(fct_code, args, kwargs): 
    code = marshal.loads(fct.__code__) 
    func = types.FunctionType(code, globals(), "some_func_name") 

    return fct(*args, **kwargs) 

client.py

from celery_server import run_fct 
import marshal 

def calc(x, y): 
    return x*y 

fct_code = marshal.dumps(calc.func_code) 
run_fct.apply_async((fct_code, 10, 2)) 

我得到的以下客戶端錯誤:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte 

的功能bytes_to_strkombu.utils.encoding.py

是否有另一種或更好的方法來執行我的功能?

謝謝你的幫助。

回答

0

我找到了一個解決方案:通過包裝我的功能,使被包裝的函數的芹菜任務作爲參數,解決了UnicodeDecodeError

celery_server.py:

class MyFunction(object): 
    def __init__(self, fct): 
     self.fct_code = marshal.dumps(fct.__code__) 

    def run(self, *args, **kwargs): 
     run_fct.apply_async(args=(self,)+args, kwargs=kwargs, serializer='pickle') 

和client.py:

from celery_server import MyFunction 

myFct = MyFunction(calc) 
myFct.run(x=10, y=2) 

然後它就像一個魅力。

但是,使用包cloud解決還依賴項上的工人一邊我的功能問題,舉例來說,如果我使用的附加功能或包在我的功能:

from celery import Celery 
import cloud 
import pickle 

app = Celery('tasks', broker='redis://[email protected]//', backend='redis://localhost') 

class MyFunction(object): 
    def __init__(self, fct): 
     self.serialized_code = cloud.serialization.cloudpickle.dumps(fct) 

    def run(self, *args, **kwargs): 
     run_fct.apply_async(args=(self,)+args, kwargs=kwargs, serializer='pickle') 

@app.task 
def run_fct(myFct, *args, **kwargs): 
    fct = pickle.loads(myFct.serialized_code) 

    return fct(*args, **kwargs) 

client.py:

from tasks import MyFunction 
import time 

def calc(x, y): 
    time.sleep(5) 
    return x*y 

myFct = MyFunction(calc) 
myFct.run(x=10, y=2) 

但是:目前的cloud不支持python3。