我有一個運行芹菜服務器使用redis作爲borker和結果存儲(python3)。我想有一個任意的函數,它沒有被註冊到服務器,由芹菜工作人員執行。我嘗試使用包marshal(以下Is there an easy way to pickle a python function (or otherwise serialize its code)?)序列化此功能轉移的字節碼的工人:由芹菜工作人員執行任意函數
celery_server.py:
from celery import Celery
import types
import marshal
app = Celery('tasks', broker='redis://[email protected]//', backend='redis://localhost')
@app.task
def run_fct(fct_code, args, kwargs):
code = marshal.loads(fct.__code__)
func = types.FunctionType(code, globals(), "some_func_name")
return fct(*args, **kwargs)
client.py
from celery_server import run_fct
import marshal
def calc(x, y):
return x*y
fct_code = marshal.dumps(calc.func_code)
run_fct.apply_async((fct_code, 10, 2))
我得到的以下客戶端錯誤:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte
的功能bytes_to_str
的kombu.utils.encoding.py
。
是否有另一種或更好的方法來執行我的功能?
謝謝你的幫助。