計算平均我有兩個dataframes: 第一數據幀classRecord
有10個不同的條目像以下:分裂火花數據幀和基於一個列值
Class, Calculation
first, Average
Second, Sum
Third, Average
第二數據幀studentRecord
具有圍繞50K條目像以下:
Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second
從第二個數據框中,根據類的類型,我想分別基於陣營進行高度計算(對於第一類:平均值,第二類:總和等)(如果類是fir st,平均黃色,白色等)。 我嘗試以下操作:
//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = {
val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect
return avg_by_key
}
//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)
// for each loop on each class type
classRecord.rdd.foreach{
//filter students based on camps
var campYellow =studentRecord.filter($"Camp" === "yellow")
var campWhite =studentRecord.filter($"Camp" === "white")
var campRed =studentRecord.filter($"Camp" === "red")
// since I know that calculation for first class is average, so representing calculation only for class first
val avgcampYellow = averageOnName(campYellow)
val avgcampWhite = averageOnName(campWhite)
val avgcampRed = averageOnName(campRed)
// union of all
val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
//union with yellow camp data
val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
var dfYellWhite = dfYellow.union(dfWhite)
//union with yellow,white camp data
val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfRed = sqlContext.createDataFrame(rddRed, schema)
var dfYellWhiteRed = dfYellWhite .union(dfRed)
// other modifications and final result to hive
}
在這裏,我掙扎:
1.hardcoding Yellow, red and white, there may be other camp type also.
2. Filtering same dataframe many times
3. Not able to figure out how to calculate differently according to class calculation type.
幫助表示讚賞。謝謝。
如果我理解正確,您希望高度的平均值或總和取決於Camp和Class?怎麼樣計算所有camp/class的組合,把它放在一個數據框中,然後分別讀取'classRecord'df? – Shaido