我使用Spark 2.0.2。我有數據是按日劃分的。我想將彼此獨立的不同分區聚類在一起,並比較聚類中心(計算它們之間的距離)以查看聚類將如何隨時間變化。Spark MLlib集羣中心具有不同的維度
我爲每個parition做了完全相同的預處理(縮放,一個熱門編碼等)。我爲此使用了預定義的管道,它在「正常」學習和預測環境中完美工作。但是當我想計算聚類中心之間的距離時,不同分區的相應向量具有不同的大小(不同的維度)。
一些代碼片斷:
預處理管線是建立這樣的:
val protoIndexer = new StringIndexer().setInputCol("protocol").setOutputCol("protocolIndexed").setHandleInvalid("skip")
val serviceIndexer = new StringIndexer().setInputCol("service").setOutputCol("serviceIndexed").setHandleInvalid("skip")
val directionIndexer = new StringIndexer().setInputCol("direction").setOutputCol("directionIndexed").setHandleInvalid("skip")
val protoEncoder = new OneHotEncoder().setInputCol("protocolIndexed").setOutputCol("protocolEncoded")
val serviceEncoder = new OneHotEncoder().setInputCol("serviceIndexed").setOutputCol("serviceEncoded")
val directionEncoder = new OneHotEncoder().setInputCol("directionIndexed").setOutputCol("directionEncoded")
val scaleAssembler = new VectorAssembler().setInputCols(Array("duration", "bytes", "packets", "tos", "host_count", "srv_count")).setOutputCol("scalableFeatures")
val scaler = new StandardScaler().setInputCol("scalableFeatures").setOutputCol("scaledFeatures")
val featureAssembler = new VectorAssembler().setInputCols(Array("scaledFeatures", "protocolEncoded", "urgent", "ack", "psh", "rst", "syn", "fin", "serviceEncoded", "directionEncoded")).setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(protoIndexer, protoEncoder, serviceIndexer, serviceEncoder, directionIndexer, directionEncoder, scaleAssembler, scaler, featureAssembler))
pipeline.write.overwrite().save(config.getString("pipeline"))
定義k均值,加載預定義的預處理流水線,添加k均值到流水線:
val kmeans = new KMeans().setK(40).setTol(1.0e-6).setFeaturesCol("features")
val pipelineStages = Pipeline.load(config.getString("pipeline")).getStages
val pipeline = new Pipeline().setStages(pipelineStages ++ Array(kmeans))
加載數據分區,計算特徵,擬合管道,獲得k-means模型並顯示第一個集羣中心的大小,例如:
(1 to 7 by 1).map { day =>
val data = sparkContext.textFile("path/to/data/" + day + "/")
val rawFeatures = data.map(extractFeatures....).toDF(featureHeaders: _*)
val model = pipeline.fit(rawFeatures)
val kmeansModel = model.stages(model.stages.size - 1).asInstanceOf[KMeansModel]
println(kmeansModel.clusterCenters(0).size)
}
對於不同的分區,集羣中心具有不同的維度(但對於分區內的40個集羣中的每個集羣都是相同的)。所以我無法計算它們之間的距離。我會懷疑它們的尺寸都是相同的(即我的歐幾里德空間的大小是13,因爲我有13個特徵)。但它給我奇怪的數字,我不明白。
我將提取的特徵向量保存到文件中以檢查它們。他們的格式是懷疑的。每個功能都存在。
任何想法我做錯了或如果我有一個誤解?謝謝!