2017-05-25 48 views
1

我有我正在讀入火花數據框的文檔的語料庫。 我有tokeniked和矢量化的文本,現在我想餵養向量化的數據到mllib LDA模型。 LDA API文檔似乎要求數據爲:使用PySpark 1.6爲LDA培訓準備數據

rdd - 文檔的RDD,它們是文檔ID和詞(詞)計數向量的元組。術語計數向量是具有固定大小詞彙表(其中詞彙大小是向量的長度)的「詞袋」。文件ID必須是唯一的且> = 0。

如何從我的數據框中獲得合適的rdd?

from pyspark.mllib.clustering import LDA 
from pyspark.ml.feature import Tokenizer 
from pyspark.ml.feature import CountVectorizer 

#read the data 
tf = sc.wholeTextFiles("20_newsgroups/*") 

#transform into a data frame 
df = tf.toDF(schema=['file','text']) 

#tokenize 
tokenizer = Tokenizer(inputCol="text", outputCol="words") 
tokenized = tokenizer.transform(df) 

#vectorize 
cv = CountVectorizer(inputCol="words", outputCol="vectors") 
model = cv.fit(tokenized) 
result = model.transform(tokenized) 

#transform into a suitable rdd 
myrdd = ? 

#LDA 
model = LDA.train(myrdd, k=2, seed=1) 

PS:我使用Apache 1.6.3星火

+0

如果我可能會問,爲什麼使用MLlib的LDA? LDA提供spark-ml – eliasah

+0

只是試圖跨幾個教程stich片斷。不反對採取不同的做法。 – ADJ

+0

然後我建議看一下spark-ml的官方文檔。這很簡單。正常情況下,您的價值結果已準備就緒。 – eliasah

回答

3

讓我們先來組織導入,讀取數據,做一些簡單的特殊字符,並去除其轉化爲一個DataFrame

import re # needed to remove special character 
from pyspark import Row 

from pyspark.ml.feature import StopWordsRemover 
from pyspark.ml.feature import Tokenizer, CountVectorizer 
from pyspark.mllib.clustering import LDA 
from pyspark.sql import functions as F 
from pyspark.sql.types import StructType, StructField, LongType 

pattern = re.compile('[\W_]+') 

rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \ 
    .mapValues(lambda x: pattern.sub(' ', x)).cache() # ref. https://stackoverflow.com/a/1277047/3415409 

df = rdd.toDF(schema=['file', 'text']) 

我們需要爲每個添加一個索引。下面的代碼片段從這個問題的啓發有關添加primary keys with Apache Spark

row_with_index = Row(*["id"] + df.columns) 

def make_row(columns): 
    def _make_row(row, uid): 
     row_dict = row.asDict() 
     return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) 

    return _make_row 

f = make_row(df.columns) 

indexed = (df.rdd 
      .zipWithUniqueId() 
      .map(lambda x: f(*x)) 
      .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields))) 

一旦我們增加了指數,我們可以進行功能清洗,提取和轉換:

# tokenize 
tokenizer = Tokenizer(inputCol="text", outputCol="tokens") 
tokenized = tokenizer.transform(indexed) 

# remove stop words 
remover = StopWordsRemover(inputCol="tokens", outputCol="words") 
cleaned = remover.transform(tokenized) 

# vectorize 
cv = CountVectorizer(inputCol="words", outputCol="vectors") 
count_vectorizer_model = cv.fit(cleaned) 
result = count_vectorizer_model.transform(cleaned) 

現在,讓我們變換結果數據框中回RDD

corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \ 
    .map(lambda x: [x[0], x[1]]) 

我們的數據現在已準備好進行培訓:

# training data 
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50) 
# extracting topics 
topics = lda_model.describeTopics(maxTermsPerTopic=10) 
# extraction vocabulary 
vocabulary = count_vectorizer_model.vocabulary 

我們可以把現在跟隨打印主題描述:

for topic in range(len(topics)): 
    print("topic {} : ".format(topic)) 
    words = topics[topic][0] 
    scores = topics[topic][1] 
    [print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))] 

PS:這上面的代碼與星火1.6.3進行了測試。