2017-10-11 98 views
1

當運行下面的一段PySpark代碼:使用UDF

nlp = NLPFunctions() 

def parse_ingredients(ingredient_lines): 
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0] 
    return list(chain.from_iterable(parsed_ingredients)) 


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType())) 

我收到以下錯誤: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

我想這是因爲PySpark不能序列化這個自定義類。但是如何避免在parse_ingredients_line函數的每次運行中實例化這個昂貴對象的開銷?

回答

0

編輯:這個答案是錯誤的。該對象仍然是序列化的,然後在廣播時進行反序列化,因此不能避免序列化。 (Tips for properly using large broadcast variables?


嘗試使用broadcast variable

sc = SparkContext() 
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format. 

def parse_ingredients(ingredient_lines): 
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0] 
    return list(chain.from_iterable(parsed_ingredients)) 
+0

這個建議的解決方案給了我同樣的錯誤。 –

1

比方說,你想用(identity.py)這樣定義Identity類:

class Identity(object):     
    def __getstate__(self): 
     raise NotImplementedError("Not serializable") 

    def identity(self, x): 
     return x 

比如,你可以使用一個可調用對象(f.py)並存儲Identity實例作爲類成員:

from identity import Identity 

class F(object):       
    identity = None 

    def __call__(self, x): 
     if not F.identity: 
      F.identity = Identity() 
     return F.identity.identity(x) 

並使用這些,如下所示:

from pyspark.sql.functions import udf 
import f 

sc.addPyFile("identity.py") 
sc.addPyFile("f.py") 

f_ = udf(f.F()) 

spark.range(3).select(f_("id")).show() 
+-----+ 
|F(id)| 
+-----+ 
| 0| 
| 1| 
| 2| 
+-----+ 

或獨立的功能和關閉:通過使NLPFunctions類的所有依賴序列化

from pyspark.sql.functions import udf 
import identity 

sc.addPyFile("identity.py") 

def f(): 
    dict_ = {}     
    @udf()    
    def f_(x):     
     if "identity" not in dict_: 
      dict_["identity"] = identity.Identity() 
     return dict_["identity"].identity(x) 
    return f_ 


spark.range(3).select(f()("id")).show() 
+------+ 
|f_(id)| 
+------+ 
|  0| 
|  1| 
|  2| 
+------+