2017-10-19 154 views
0

我計算TF和IDF:變換RDD到有效的輸入,使用包含與下面的代碼CSV文件的目錄的火花mllib算法k均值

import argparse 
from os import system 

### args parsing 
parser = argparse.ArgumentParser(description='runs TF/IDF on a directory of 
text docs') 
parser.add_argument("-i","--input", help="the input in HDFS", 
required=True) 
parser.add_argument("-o", '--output', help="the output in HDFS", 
required=True) 
parser.add_argument("-mdf", '--min_document_frequency', default=1) 
args = parser.parse_args() 


docs_dir = args.input 
d_out = "hdfs://master:54310/" + args.output 
min_df = int(args.min_document_frequency) 

# import spark-realated stuff 
from pyspark import SparkContext 
from pyspark.mllib.feature import HashingTF 
from pyspark.mllib.feature import IDF 

sc = SparkContext(appName="TF-IDF") 

# Load documents (one per line). 
documents = sc.textFile(docs_dir).map(lambda title_text: 
title_text[1].split(" ")) 

hashingTF = HashingTF() 
tf = hashingTF.transform(documents) 

# IDF 
idf = IDF().fit(tf) 
tfidf = idf.transform(tf) 

#print(tfidf.collect()) 

#save 
tfidf.saveAsTextFile(d_out) 

使用

print(tfidf.collect()) 

我得到這個輸出:

[SparseVector(1048576, {812399: 4.3307}), SparseVector(1048576, {411697: 
0.0066}), SparseVector(1048576, {411697: 0.0066}), SparseVector(1048576, 
{411697: 0.0066}), SparseVector(1048576, {411697: 0.0066}), .... 

我也有te STED的k均值mllib算法:

from __future__ import print_function 

import sys 

import numpy as np 
from pyspark import SparkContext 
from pyspark.mllib.clustering import KMeans 

runs=4 

def parseVector(line): 
return np.array([float(x) for x in line.split(' ')]) 

if __name__ == "__main__": 
if len(sys.argv) != 3: 
    print("Usage: kmeans <file> <k>", file=sys.stderr) 
    exit(-1) 
sc = SparkContext(appName="KMeans") 
lines = sc.textFile(sys.argv[1]) 
data = lines.map(parseVector) 
k = int(sys.argv[2]) 
model = KMeans.train(data, k, runs) 
print("Final centers: " + str(model.clusterCenters)) 
print("Total Cost: " + str(model.computeCost(data))) 
sc.stop() 

這個樣本測試案例

0.0 0.0 0.0 
0.1 0.1 0.1 
0.2 0.2 0.2 
9.0 9.0 9.0 
9.1 9.1 9.1 
9.2 9.2 9.2 

,它工作正常。

現在我想在KMeans算法中應用上述tfidf的rdd輸出,但我不知道如何像上面的示例文本那樣轉換rdd,或者如何正確拆分KMeans算法中的rdd正常工作。

我真的需要這個幫助。

UPDATE

我真正的問題是我如何可以讀取輸入到它從一個文本文件應用到KMEANS mllib這樣

(1048576,[155412,857472,756332],[1.75642010278,2.41857747478,1.97365255252]) 
(1048576,[159196,323305,501636],[2.98856378408,1.63863706713,2.44956728334]) 
(1048576,[135312,847543,743411],[1.42412015238,1.58759872958,2.]) 

UPDATE2

我不是肯定在所有,但我認爲我需要從矢量上面到下面的陣列,以便直接將它應用到KMeans mllib算法

1.75642010278 2.41857747478 1.97365255252 
2.98856378408 1.63863706713 2.44956728334 
1.42412015238 1.58759872958 2.

回答

1

IDF的輸出是一個數據幀SparseVector。 KMeans將向量作爲輸入(稀疏或密集),因此,不應該進行任何轉換。您應該能夠直接使用IDF的輸出列作爲KMeans的輸入。

如果您需要在運行TFIDF和KMeans之間將數據保存到磁盤,我建議通過dataframe API將其保存爲csv。

首先轉換成數據幀使用:

from pyspark.sql import Row 

row = Row("features") # column name 
df = tfidf.map(row).toDF() 

的另一種方式,而不輸入轉換:

df = tfidf.map(lambda x: (x,)).toDF(["features"]) 

轉換後的數據幀保存爲拼花文件:

df.write.parquet('/path/to/save/file') 

要讀取數據,只需使用:

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df = sqlContext.read.parquet('/path/to/file') 

# converting from dataframe into an RDD[Vector] 
data = df.rdd.map(list) 

如果您在任何情況下,需要從保存爲一個字符串向量轉換,這也是可能的。以下是一些示例代碼:

from pyspark.mllib.linalg import Vectors, VectorUDT 
from pyspark.sql.functions import udf 

df = sc.parallelize(["(7,[1,2,4],[1,1,1])"]).toDF(["features"]) 

parse = udf(lambda s: Vectors.parse(s), VectorUDT()) 
df.select(parse("features")) 

首先使用相同的格式創建示例數據框。然後使用UDF將字符串解析爲向量。如果你想要一個rdd而不是數據框,可以使用上面的代碼在「從parquet讀取」部分進行轉換。


但是,IDF的輸出非常稀疏。矢量的長度爲1048576,其中只有一個值大於1.KMeans不會給你任何有趣的結果。

我建議你看看word2vec。它會給你一個更緊湊的矢量爲每個單詞和聚類這些載體會更有意義。使用這種方法,您可以接收一個單詞映射到它們可以用於聚類的矢量表示。

+0

Thx爲你的回覆,但是當我使用tfidf的輸出(順便保存爲文本文件,你可以在tf idf代碼的最後一行看到)我得到這個errorTypeError:無法轉換類型到Vector中。 – Gmilios

+0

@Gmilios我看到了,所以基本上你的問題是保存來自tfidf的數據,然後在KMeans之前再次讀取它? – Shaido

+0

@Gmilios增加了一些關於這個答案。 – Shaido