我與下面的模式的數據集:Spark Scala reduceByKey - 如何引用配置文件中指定的鍵?
dataset.printSchema()
|-- id: string (nullable = true)
|-- feature1: double (nullable = true)
|-- feature2: double (nullable = true)
|-- feature3: double (nullable = true)
|-- feature4: double (nullable = true)
以我application.conf我已經定義鍵的子集,應使用reduceByKey轉化:
keyInfo {
keysToBeTransformed = "feature1,feature2"
}
我可以這些密鑰加載到我的主要對象:
val config : Config = ConfigFactory.load()
val keys : Array[String] = config.getString("keyInfo.keysToBeTransformed").split(",")
對於這些鍵,我需要計算數據集中每個id的均值並將結果收集到一個數組中。目前,我用下面的辦法:
val meanFeature1 : Array[Double] = dataset.map(x => (x.id, x.feature1)).rdd
.mapValues{z => (z,1)}
.reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)}
.map(x => {
val temp = x._2
val total = temp._1
val count = temp._2
(x._1, total/count)
}).collect().sortBy(_._1).map(_._2),
val meanFeature2 : Array[Double] = dataset.map(x => (x.id, x.feature2)).rdd
.mapValues{z => (z,1)}
.reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)}
.map(x => {
val temp = x._2
val total = temp._1
val count = temp._2
(x._1, total/count)
}).collect().sortBy(_._1).map(_._2)
上述方法的問題是,它不會對我的application.conf指定的鍵的引用(計算不動態改變在密鑰再 - 在application.conf中指定)
我該如何做到這一點?
偉大的,謝謝!你只需要收集()一次,你的解決方案也更具性能。 – kanimbla