2017-02-16 29 views
3

我試圖從LDA模型中獲得術語ID的相應主題單詞。UDF將單詞映射到術語索引中的Spark

這裏是主題的數據幀和它的字分佈與LDA在星火

topics_desc=ldaModel.describeTopics(20) 
topics_desc.show(1) 
+-----+--------------------+--------------------+ 
|topic|   termIndices|   termWeights| 
+-----+--------------------+--------------------+ 
| 0|[0, 39, 68, 43, 5...|[0.06362107696025...| 
+-----+--------------------+--------------------+ 
only showing top 1 row 

現在,因爲我們有termIndices,而不是實際的話,我想另一列添加到這將是該數據幀相應termIndices的字樣。

現在,因爲我在Spark中運行了CountVectorizer,我使用該模型並獲取如下所示的單詞列表。

# Creating Term Frequency Vector for each word 
    cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0) 
    cvModel=cv.fit(swremoved_df) 

cvModel.vocabulary給出單詞列表。

所以現在這裏是一個UDF我寫來獲取映射:

from pyspark.sql.functions import udf 
from pyspark.sql.types import ArrayType 

def term_to_words(termindices): 
    """ To get the corresponding words from term indices 

    """ 


    return np.array(cvModel.vocabulary)[termindices] 

term_to_words_conv=udf(term_to_words) 


topics=topics_desc.withColumn("topics_words",term_to_words_conv("termIndices")) 

我因爲在numpy的數組我可以索引通過傳遞指數的升降轉換的列表,以NP陣列的原因,其中一個可以」不要在列表中做這件事。

但我得到這個錯誤。我不確定爲什麼這樣,因爲我在這裏幾乎沒有做任何事情。

Py4JError: An error occurred while calling o443.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) 
    at py4j.Gateway.invoke(Gateway.java:272) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 

編輯:

,所以我想利用映射函數,而不是UDF

def term_to_words(x): 
    """ Mapper function to get the corresponding words for the term index 

    """ 

    row=x.asDict() 
    word_list=np.array(cvModel.vocabulary) 

    return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]]) 


topics_rdd=topics_desc.rdd.map(term_to_words) 

/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    931   # SparkContext#runJob. 
    932   mappedRDD = rdd.mapPartitions(partitionFunc) 
--> 933   port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 
    934   return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    935 

AttributeError: 'NoneType' object has no attribute 'sc' 
+0

您必須改用'topics_desc.rdd.map(...)。toDF()'。在'map'函數中,您可以將字典輸出轉換爲Spark'Row',以便還可以轉換回數據幀。 – titipata

+0

爲什麼會失敗?我的意思是我嘗試了一個看起來很好的mapper函數。但是給出了一個錯誤。請參閱編輯 – Baktaawar

回答

3

這裏有兩個不同的問題:

  • CountVectorizer是Java對象的包裝。它不能被序列化並通過閉包傳遞。出於同樣的原因,您不能在map關閉中使用它。
  • 您不能從UDF返回NumPy類型。

例如,您可以:

from pyspark.sql.types import ArrayType, StringType 

def indices_to_terms(vocabulary): 
    def indices_to_terms(xs): 
     return [vocabulary[int(x)] for x in xs] 
    return udf(indices_to_terms, ArrayType(StringType())) 

用法:

topics_desc.withColumn(
    "topics_words", indices_to_terms(cvModel.vocabulary)("termIndices")) 

如果你想從UDF返回之前使用NumPy的數組你必須使用tolist()方法。

-1

如果可以,請使用StringIndexerIndexToStringStringIndexer將從一列中爲您創建術語索引,IndexToString將根據索引(您的*term_to_words*函數)查找字符串值。鏈接的Spark文檔中有代碼示例。