讓我們先來組織導入,讀取數據,做一些簡單的特殊字符,並去除其轉化爲一個DataFrame
:
import re # needed to remove special character
from pyspark import Row
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType
pattern = re.compile('[\W_]+')
rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \
.mapValues(lambda x: pattern.sub(' ', x)).cache() # ref. https://stackoverflow.com/a/1277047/3415409
df = rdd.toDF(schema=['file', 'text'])
我們需要爲每個添加一個索引。下面的代碼片段從這個問題的啓發有關添加primary keys with Apache Spark:
row_with_index = Row(*["id"] + df.columns)
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
indexed = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
一旦我們增加了指數,我們可以進行功能清洗,提取和轉換:
# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)
# remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")
cleaned = remover.transform(tokenized)
# vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
count_vectorizer_model = cv.fit(cleaned)
result = count_vectorizer_model.transform(cleaned)
現在,讓我們變換結果數據框中回RDD
corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
.map(lambda x: [x[0], x[1]])
我們的數據現在已準備好進行培訓:
# training data
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50)
# extracting topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary
我們可以把現在跟隨打印主題描述:
for topic in range(len(topics)):
print("topic {} : ".format(topic))
words = topics[topic][0]
scores = topics[topic][1]
[print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))]
PS:這上面的代碼與星火1.6.3進行了測試。
如果我可能會問,爲什麼使用MLlib的LDA? LDA提供spark-ml – eliasah
只是試圖跨幾個教程stich片斷。不反對採取不同的做法。 – ADJ
然後我建議看一下spark-ml的官方文檔。這很簡單。正常情況下,您的價值結果已準備就緒。 – eliasah