2014-03-26 26 views
3

我做了這樣的多道功能,的Python多的工作,芹菜任務,但AttributeError的

import multiprocessing 
import pandas as pd 
import numpy as np 

def _apply_df(args): 
    df, func, kwargs = args 
    return df.apply(func, **kwargs) 

def apply_by_multiprocessing(df, func, **kwargs): 
    workers = kwargs.pop('workers') 
    pool = multiprocessing.Pool(processes=workers) 
    result = pool.map(_apply_df, [(d, func, kwargs) 
      for d in np.array_split(df, workers)]) 
    pool.close() 
    return pd.concat(list(result)) 

def square(x): 
    return x**x 

if __name__ == '__main__': 
    df = pd.DataFrame({'a':range(10), 'b':range(10)}) 
    apply_by_multiprocessing(df, square, axis=1, workers=4) 
    ## run by 4 processors 

以上「apply_by_multiprocessing」可以執行熊貓數據幀適用於並行。但是當我將它交給Celery任務時,它引發了AssertionError:'Worker'對象沒有'_config'屬性。

from celery import shared_task 

@shared_task 
def my_multiple_job(): 
    df = pd.DataFrame({'a':range(10), 'b':range(10)}) 
    apply_by_multiprocessing(df, square, axis=1, workers=4) 

這是錯誤跟蹤是這樣的,

File "/Users/yong27/work/goldstar/kinmatch/utils.py", line 14, in apply_by_multiprocessing 
    pool = multiprocessing.Pool(processes=workers) 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 118, in Pool 
    context=self.get_context()) 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 146, in __init__ 
    self._setup_queues() 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 238, in _setup_queues 
    self._inqueue = self._ctx.SimpleQueue() 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue 
    return SimpleQueue(ctx=self.get_context()) 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__ 
    self._rlock = ctx.Lock() 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock 
    return Lock(ctx=self.get_context()) 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 164, in __init__ 
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 60, in __init__ 
    kind, value, maxvalue, self._make_name(), 
    File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 118, in _make_name 
    return '%s-%s' % (process.current_process()._config['semprefix'], 
AttributeError: 'Worker' object has no attribute '_config' 

看來,由於芹菜的工人是不是一個正常的過程。我怎麼解決這個問題?我使用Python3.4,Django 1.6.2,芹菜3.1.10,django-celery 3.1.9,熊貓0.12.0。

+0

的可能重複[使用芹菜任務多池產生異常(http://stackoverflow.com/questions/27904162/using-multiprocessing-pool-from-celery-task-raises-除離子) – Akhorus

回答

2

我不知道爲什麼多處理不起作用,但我建議你使用芹菜組任務。

from celery import task, group 

def feeds_fetch(feeds): 
    g = group(fetch_one.s(feed) for feed in feeds) 
    g.apply_async() 


@task() 
def fetch_one(feed): 
    return feed.fetch() 
+0

我會研究芹菜小組的任務。但是我必須收集每個任務的返回值,這看起來並不容易。我沒有足夠的時間,所以我把芹菜換成了子過程.Popen。直到現在還好。 –

2

這個問題有一個很好的答案in this other question

基本上,它是一個known issue of Celery骯髒的黑客提供:它的工作對我來說,我只是在同一文件中添加了下面的代碼,我任務定義:

from celery.signals import worker_process_init 
from multiprocessing import current_process 

@worker_process_init.connect 
def fix_multiprocessing(**kwargs): 
    try: 
     current_process()._config 
    except AttributeError: 
     current_process()._config = {'semprefix': '/mp'}