2017-09-15 91 views
0

計算平均我有兩個dataframes: 第一數據幀classRecord有10個不同的條目像以下:分裂火花數據幀和基於一個列值

Class, Calculation 
first, Average 
Second, Sum 
Third, Average 

第二數據幀studentRecord具有圍繞50K條目像以下:

Name, height, Camp, Class 
Shae, 152, yellow, first 
Joe, 140, yellow, first 
Mike, 149, white, first 
Anne, 142, red, first 
Tim, 154, red, Second 
Jake, 153, white, Second 
Sherley, 153, white, Second 

從第二個數據框中,根據類的類型,我想分別基於陣營進行高度計算(對於第一類:平均值,第二類:總和等)(如果類是fir st,平均黃色,白色等)。 我嘗試以下操作:

//function to calculate average 
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = { 
    val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd 
    var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect 
    return avg_by_key 
} 

//required schema for further modifications 
val schema = StructType(
StructField("name", StringType, false) :: 
StructField("avg", DoubleType, false) :: Nil) 

// for each loop on each class type 
classRecord.rdd.foreach{ 
    //filter students based on camps 
    var campYellow =studentRecord.filter($"Camp" === "yellow") 
    var campWhite =studentRecord.filter($"Camp" === "white") 
    var campRed =studentRecord.filter($"Camp" === "red") 

    // since I know that calculation for first class is average, so representing calculation only for class first 
    val avgcampYellow = averageOnName(campYellow) 
    val avgcampWhite = averageOnName(campWhite) 
    val avgcampRed = averageOnName(campRed) 

    // union of all 
    val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfYellow = sqlContext.createDataFrame(rddYellow, schema) 
    //union with yellow camp data 
    val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfWhite = sqlContext.createDataFrame(rddWhite, schema) 
    var dfYellWhite = dfYellow.union(dfWhite) 
    //union with yellow,white camp data 
    val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfRed = sqlContext.createDataFrame(rddRed, schema) 
    var dfYellWhiteRed = dfYellWhite .union(dfRed) 
    // other modifications and final result to hive 
} 

在這裏,我掙扎:

1.hardcoding Yellow, red and white, there may be other camp type also. 
2. Filtering same dataframe many times 
3. Not able to figure out how to calculate differently according to class calculation type. 

幫助表示讚賞。謝謝。

+0

如果我理解正確,您希望高度的平均值或總和取決於Camp和Class?怎麼樣計算所有camp/class的組合,把它放在一個數據框中,然後分別讀取'classRecord'df? – Shaido

回答

0

您可以簡單地對Class/Camp的所有組合進行平均和求和計算,然後分別解析classRecord數據幀並提取您需要的數據。您可以通過使用groupBy()方法輕鬆完成此操作並彙總值。

使用您的示例數據幀:

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

studentRecord.show() 

+-------+------+------+------+ 
| Name|height| Camp| Class| 
+-------+------+------+------+ 
| Shae| 152|yellow| first| 
| Joe| 140|yellow| first| 
| Mike| 149| white| first| 
| Anne| 142| red| first| 
| Tim| 154| red|Second| 
| Jake| 153| white|Second| 
|Sherley| 153| white|Second| 
+-------+------+------+------+ 

val df = studentRecord.groupBy("Class", "Camp").agg(
    sum($"height").as("Sum"), 
    avg($"height").as("Average"), 
    collect_list($"Name").as("Names")) 
df.show() 

+------+------+---+-------+---------------+ 
| Class| Camp|Sum|Average|   Names| 
+------+------+---+-------+---------------+ 
| first| white|149| 149.0|   [Mike]| 
| first| red|142| 142.0|   [Anne]| 
|Second| red|154| 154.0|   [Tim]| 
|Second| white|306| 153.0|[Jake, Sherley]| 
| first|yellow|292| 146.0| [Shae, Joe]| 
+------+------+---+-------+---------------+ 

這樣做後,你可以簡單地檢查你的第一classRecord數據幀之後,行你需要的。它可以看起來像什麼樣的例子,可以根據您的實際需求進行更改:

// Collects the dataframe as an Array[(String, String)] 
val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)} 

for (classRec <- classRecs){ 
    val clas = classRec._1 
    val calc = classRec._2 

    // Matches which calculation you want to do 
    val df2 = calc match { 
    case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average") 
    case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum") 
    } 

// Do something with df2 
} 

希望它有幫助!

+0

部分類似這樣的事情,我還需要所有屬於案例的名字,比如「Class,camp,names,average」。即使我得到最終的DF。我將如何決定,首先我需要選擇平均(放棄總和),第二我需要總和(丟棄平均)等等。 – Swati

+0

我也試過上面的解決方案,它顯示錯誤:值groupby不是org.apache.spark.rdd.RDD [String]的成員。謝謝。 – Swati

+0

@Swati對不起,它應該是'groupBy()'大寫字母'B'。添加了名稱列表以及解決方案。 – Shaido