2017-01-24 92 views
1

我想包裝pyspark管道的構造函數。 init構造函數,以及新包裝的構造函數中的猴子補丁。但是,我遇到了一個似乎與Pipeline方式有關的錯誤。 初始化使用裝飾包裝pyspark管道.__ init__和裝飾器

下面是實際執行猴補丁代碼:

def monkeyPatchPipeline(): 
     oldInit = Pipeline.__init__ 

     def newInit(self, **keywordArgs): 
     oldInit(self, stages=keywordArgs["stages"]) 

     Pipeline.__init__ = newInit 

然而,當我運行一個簡單的程序:

import PythonSparkCombinatorLibrary 
from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.feature import HashingTF, Tokenizer 

PythonSparkCombinatorLibrary.TransformWrapper.monkeyPatchPipeline() 
tokenizer = Tokenizer(inputCol="text", outputCol="words") 
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="features") 
lr = LogisticRegression(maxIter=10, regParam=0.001) 

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) 

我得到這個錯誤:

Traceback (most recent call last): 
    File "C:\<my path>\PythonApplication1\main.py", line 26, in <module> 
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) 
    File "C:<my path>PythonApplication1 \PythonSparkCombinatorLibrary.py", line 36, in newInit 
oldInit(self, stages=keywordArgs["stages"]) 
    File "C:\<pyspark_path>\pyspark\__init__.py", line 98, in wrapper 
    return func(*args, **kwargs) 
File "C:\<pyspark_path>\pyspark\ml\pipeline.py", line 63, in __init__ 
    kwargs = self.__init__._input_kwargs 
AttributeError: 'function' object has no attribute '_input_kwargs' 

縱觀pyspark interfa我看到了Pipeline。 初始化看起來是這樣的:

@keyword_only 
def __init__(self, stages=None): 
    """ 
    __init__(self, stages=None) 
    """ 
    if stages is None: 
     stages = [] 
    super(Pipeline, self).__init__() 
    kwargs = self.__init__._input_kwargs 
    self.setParams(**kwargs) 

並注意到@keyword_only裝飾,我檢查這些代碼以及:

def keyword_only(func): 
    """ 
    A decorator that forces keyword arguments in the wrapped method 
    and saves actual input keyword arguments in `_input_kwargs`. 
    """ 
    @wraps(func) 
    def wrapper(*args, **kwargs): 
     if len(args) > 1: 
      raise TypeError("Method %s forces keyword arguments." % func.__name__) 
     wrapper._input_kwargs = kwargs 
     return func(*args, **kwargs) 
    return wrapper 

我都這個代碼是如何工作擺在首位完全糊塗了,以及爲什麼它似乎導致我自己的包裝問題。我看到包裝器正在爲它自己添加一個_input_kwargs字段,但Pipeline .__ init__如何用自己讀取該字段.__ init __._ input_kwargs?爲什麼當我重新包裝Pipeline .__ init__時會發生同樣的事情?

回答

0

裝飾器101.裝飾器是一個高階函數,它將函數作爲它的第一個參數(通常是唯一的)並返回一個函數。 @註釋僅僅是一個簡單的函數調用一個語法糖,所以以下

@decorator 
def decorated(x): 
    ... 

可以改寫例如作爲:

def decorated_(x): 
    ... 

decorated = decorator(decorated_) 

所以Pipeline.__init__實際上是一個functools.wrappedwrapper其捕獲定義__init__func參數的keyword_only)作爲其關閉的一部分。當它被調用時,它使用收到的kwargs作爲其本身的function attribute。基本上,這裏所發生可以簡化爲:

def f(**kwargs): 
    f._input_kwargs = kwargs # f is in the current scope 

hasattr(f, "_input_kwargs") 
False 
f(foo=1, bar="x") 

hasattr(f, "_input_kwargs") 
True 

當你再包裝(裝飾)__init__外部函數將不會_input_kwargs連接,因此錯誤。如果你想讓它工作,你必須採用同樣的過程中,所用的原__init__,你自己的版本,例如使用相同的裝飾:

@keyword_only 
def newInit(self, **keywordArgs): 
    oldInit(self, stages=keywordArgs["stages"]) 

但我喜歡我在評論中提到的,你應該考慮繼承。