我做了這樣的多道功能,的Python多的工作,芹菜任務,但AttributeError的
import multiprocessing
import pandas as pd
import numpy as np
def _apply_df(args):
df, func, kwargs = args
return df.apply(func, **kwargs)
def apply_by_multiprocessing(df, func, **kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes=workers)
result = pool.map(_apply_df, [(d, func, kwargs)
for d in np.array_split(df, workers)])
pool.close()
return pd.concat(list(result))
def square(x):
return x**x
if __name__ == '__main__':
df = pd.DataFrame({'a':range(10), 'b':range(10)})
apply_by_multiprocessing(df, square, axis=1, workers=4)
## run by 4 processors
以上「apply_by_multiprocessing」可以執行熊貓數據幀適用於並行。但是當我將它交給Celery任務時,它引發了AssertionError:'Worker'對象沒有'_config'屬性。
from celery import shared_task
@shared_task
def my_multiple_job():
df = pd.DataFrame({'a':range(10), 'b':range(10)})
apply_by_multiprocessing(df, square, axis=1, workers=4)
這是錯誤跟蹤是這樣的,
File "/Users/yong27/work/goldstar/kinmatch/utils.py", line 14, in apply_by_multiprocessing
pool = multiprocessing.Pool(processes=workers)
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 146, in __init__
self._setup_queues()
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 238, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__
self._rlock = ctx.Lock()
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock
return Lock(ctx=self.get_context())
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 164, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 60, in __init__
kind, value, maxvalue, self._make_name(),
File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 118, in _make_name
return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'
看來,由於芹菜的工人是不是一個正常的過程。我怎麼解決這個問題?我使用Python3.4,Django 1.6.2,芹菜3.1.10,django-celery 3.1.9,熊貓0.12.0。
的可能重複[使用芹菜任務多池產生異常(http://stackoverflow.com/questions/27904162/using-multiprocessing-pool-from-celery-task-raises-除離子) – Akhorus