2017-08-02 111 views
0

我想在Spark中使用MongoDB提供的數據運行k-means。 我有一個工作實施例中,其作用對一個平面文件:如何在kmeans中映射Spark中的MongoDB數據?

sc = SparkContext(appName="KMeansExample") # SparkContext 
data = sc.textFile("/home/mhoeller/kmeans_data.txt") 
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')])) 
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random") 

這是平面文件的格式爲:

現在我想用MongoDB的更換簡單文件:

spark = SparkSession \ 
.builder \ 
.appName("myApp") \ 
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \ 
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \ 
.getOrCreate() 

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load() 

# <<<< Here I am missing the parsing >>>>> 

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random") 

我喜歡瞭解如何映射df中的數據,以便它可以用作kmeans的輸入。

的數據庫的 「佈局」 是:

| - _id:字符串(可爲空=真)
| - field0:二進制(可爲空=真)
| - FIELD1:二進制(可空=真)
| - 場2:二進制(可爲空=真)
| - 場3:二進制(可爲空=真)
| - 字段4:二進制(可爲空=真)
| - 字段5:二進制(空值=真)
| - 基爾D6:二進制(可爲空=真)
| - 字段7:二進制(可爲空=真)
| - 字段8:二元的(可爲空=真)
| - 字段9:二元的(可爲空=真)

回答

1

我喜歡瞭解如何映射df中的數據,以便它可以用作kmeans的輸入。

根據您的代碼段,我假定您使用PySpark。

如果你看看clustering.KMeans的Python API文檔,你可以看到,第一個參數需要使用MongoDB Spark Connector

df = spark.read.format("com.mongodb.spark.sql.DefaultSource") 
       .option("uri","mongodb://127.0.0.1/ycsb.usertable") 
       .load() 

從MongoDB的負載數據,你有什麼要RDD of Vector or convertible sequence types

後,你下面的代碼執行在df是一個DataFrame,所以我們需要將它轉換成可轉換爲Vector類型的東西。

因爲你在你的文本文件例如使用numpy.array,我們可以繼續使用這種陣列式學習轉變。

根據提供的layout,首先我們需要刪除_id列,因爲它不需要進行聚類訓練。有關更多信息,另請參閱Vector數據類型。

通過以上信息,讓我們來看看它:

# Drop _id column and get RDD representation of the DataFrame 
rowRDD = df.drop("_id").rdd 

# Convert RDD of Row into RDD of numpy.array 
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row])) 

# Feed into KMeans 
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random") 

如果你想保持布爾值(真/假),而不是整數(1/0),那麼你可以刪除int部分。如下圖所示:

parsedRdd = rowRDD.map(lambda row: array([x for x in row])) 

把所有的人都在一起:

from numpy import array 
from pyspark.mllib.clustering import KMeans 
import org.apache.spark.sql.SparkSession 

spark = SparkSession \ 
.builder \ 
.appName("myApp") \ 
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \ 
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \ 
.getOrCreate() 

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load() 

rowRDD = df.drop("_id").rdd 
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row])) 

clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random") 
clusters.clusterCenters 
相關問題