2017-04-06 64 views
0

我與下面的模式的數據集:Spark Scala reduceByKey - 如何引用配置文件中指定的鍵?

dataset.printSchema() 
    |-- id: string (nullable = true) 
    |-- feature1: double (nullable = true) 
    |-- feature2: double (nullable = true) 
    |-- feature3: double (nullable = true) 
    |-- feature4: double (nullable = true) 

以我application.conf我已經定義鍵的子集,應使用reduceByKey轉化:

keyInfo { 
    keysToBeTransformed = "feature1,feature2" 
} 

我可以這些密鑰加載到我的主要對象:

val config : Config = ConfigFactory.load() 
val keys : Array[String] = config.getString("keyInfo.keysToBeTransformed").split(",") 

對於這些鍵,我需要計算數據集中每個id的均值並將結果收集到一個數組中。目前,我用下面的辦法:

val meanFeature1 : Array[Double] = dataset.map(x => (x.id, x.feature1)).rdd 
    .mapValues{z => (z,1)} 
    .reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)} 
    .map(x => { 
     val temp = x._2 
     val total = temp._1 
     val count = temp._2 
     (x._1, total/count) 
    }).collect().sortBy(_._1).map(_._2), 

    val meanFeature2 : Array[Double] = dataset.map(x => (x.id, x.feature2)).rdd 
    .mapValues{z => (z,1)} 
    .reduceByKey{(x,y) => (x._1 + y._1, x._2 + y._2)} 
    .map(x => { 
     val temp = x._2 
     val total = temp._1 
     val count = temp._2 
     (x._1, total/count) 
    }).collect().sortBy(_._1).map(_._2) 

上述方法的問題是,它不會對我的application.conf指定的鍵的引用(計算不動態改變在密鑰再 - 在application.conf中指定)

我該如何做到這一點?

回答

1

我認爲DataFrame API更適合這種情況,因爲它更好地支持按名稱動態訪問列。和轉換DatasetDataFrame很簡單:

val averagesPerId: Array[Array[Double]] = dataset 
    .groupBy("id") // this also converts to DataFrame 
    .avg(keys: _*) // create average for each key - creates a "avg(featureX)" column for each featureX key 
    .sort("id") 
    .map(r => keys.map(col => r.getAs[Double](s"avg($col)"))) // map Rows into Array[Double], one for each ID 
    .collect() 

// transposing the result to create an array where each row relates to a single key, 
// and mapping each row to its key: 
val averagesPerKey: Map[String, Array[Double]] = keys.zip(averagesPerId.transpose(identity)).toMap 

// for example, if `feature1` was in `keys`: 
val meanFeature1 = averagesPerKey("feature1") 
+0

偉大的,謝謝!你只需要收集()一次,你的解決方案也更具性能。 – kanimbla

0

我在此期間想出了另一個類似的解決方案將是如下:

val meanFeatures : Array[Array[Double]] = keys.map(col => {dataset 
    .groupBy("id") 
    .agg(avg(col)) 
    .as[(String,Double)] 
    .sort("id") 
    .collect().map(_._2) 
}) 
val meanFeature1 : Array[Double] = meanFeatures(0) 
相關問題