2017-01-13 28 views
2

我使用Spark 2.0.2。我有數據是按日劃分的。我想將彼此獨立的不同分區聚類在一起,並比較聚類中心(計算它們之間的距離)以查看聚類將如何隨時間變化。Spark MLlib集羣中心具有不同的維度

我爲每個parition做了完全相同的預處理(縮放,一個熱門編碼等)。我爲此使用了預定義的管道,它在「正常」學習和預測環境中完美工作。但是當我想計算聚類中心之間的距離時,不同分區的相應向量具有不同的大小(不同的維度)。

一些代碼片斷:

預處理管線是建立這樣的:

val protoIndexer = new StringIndexer().setInputCol("protocol").setOutputCol("protocolIndexed").setHandleInvalid("skip") 
val serviceIndexer = new StringIndexer().setInputCol("service").setOutputCol("serviceIndexed").setHandleInvalid("skip") 
val directionIndexer = new StringIndexer().setInputCol("direction").setOutputCol("directionIndexed").setHandleInvalid("skip") 

val protoEncoder = new OneHotEncoder().setInputCol("protocolIndexed").setOutputCol("protocolEncoded") 
val serviceEncoder = new OneHotEncoder().setInputCol("serviceIndexed").setOutputCol("serviceEncoded") 
val directionEncoder = new OneHotEncoder().setInputCol("directionIndexed").setOutputCol("directionEncoded") 

val scaleAssembler = new VectorAssembler().setInputCols(Array("duration", "bytes", "packets", "tos", "host_count", "srv_count")).setOutputCol("scalableFeatures") 
val scaler = new StandardScaler().setInputCol("scalableFeatures").setOutputCol("scaledFeatures") 
val featureAssembler = new VectorAssembler().setInputCols(Array("scaledFeatures", "protocolEncoded", "urgent", "ack", "psh", "rst", "syn", "fin", "serviceEncoded", "directionEncoded")).setOutputCol("features") 
val pipeline = new Pipeline().setStages(Array(protoIndexer, protoEncoder, serviceIndexer, serviceEncoder, directionIndexer, directionEncoder, scaleAssembler, scaler, featureAssembler)) 
pipeline.write.overwrite().save(config.getString("pipeline")) 

定義k均值,加載預定義的預處理流水線,添加k均值到流水線:

val kmeans = new KMeans().setK(40).setTol(1.0e-6).setFeaturesCol("features") 
val pipelineStages = Pipeline.load(config.getString("pipeline")).getStages 
val pipeline = new Pipeline().setStages(pipelineStages ++ Array(kmeans)) 

加載數據分區,計算特徵,擬合管道,獲得k-means模型並顯示第一個集羣中心的大小,例如:

(1 to 7 by 1).map { day => 
    val data = sparkContext.textFile("path/to/data/" + day + "/") 
    val rawFeatures = data.map(extractFeatures....).toDF(featureHeaders: _*) 
    val model = pipeline.fit(rawFeatures) 

    val kmeansModel = model.stages(model.stages.size - 1).asInstanceOf[KMeansModel] 
    println(kmeansModel.clusterCenters(0).size) 
} 

對於不同的分區,集羣中心具有不同的維度(但對於分區內的40個集羣中的每個集羣都是相同的)。所以我無法計算它們之間的距離。我會懷疑它們的尺寸都是相同的(即我的歐幾里德空間的大小是13,因爲我有13個特徵)。但它給我奇怪的數字,我不明白。

我將提取的特徵向量保存到文件中以檢查它們。他們的格式是懷疑的。每個功能都存在。

任何想法我做錯了或如果我有一個誤解?謝謝!

回答

1

跳過的事實,KMeans is not a good choice for processing categorical data您的代碼不保證:

  • 同樣的指數 - 批次之間的功能關係。 StringIndexer按照頻率分配標籤。最常見的字符串編碼爲0,最不常見的爲numLabels -
  • 批次之間的相同數量的inidces以及相同形狀的單熱編碼和組裝矢量。矢量的大小等於根據OneHotEncoder中參數dropLast的值調整的唯一標籤的數量。

因此,編碼矢量可能具有不同的尺寸和批次間的解釋。

如果你想要一致的編碼,你需要持久的字典映射,以確保批次之間的索引一致。

相關問題