2017-08-10 64 views
0

雖然有人已經詢問了有關計算Weighted Average in Spark的問題,但在此問題中,我詢問的是使用數據集/數據框而不是RDD。使用無UDF的Spark數據集的加權平均值

如何計算Spark中的加權平均值?我有兩列:計數和以前的平均值:

case class Stat(name:String, count: Int, average: Double) 
val statset = spark.createDataset(Seq(Stat("NY", 1,5.0), 
          Stat("NY",2,1.5), 
          Stat("LA",12,1.0), 
          Stat("LA",15,3.0))) 

我想能夠計算的加權平均值是這樣的:

display(statset.groupBy($"name").agg(sum($"count").as("count"), 
        weightedAverage($"count",$"average").as("average"))) 

可以使用一個UDF親近:

val weightedAverage = udf(
    (row:Row)=>{ 
    val counts = row.getAs[WrappedArray[Int]](0) 
    val averages = row.getAs[WrappedArray[Double]](1) 
    val (count,total) = (counts zip averages).foldLeft((0,0.0)){ 
     case((cumcount:Int,cumtotal:Double),(newcount:Int,newaverage:Double))=>(cumcount+newcount,cumtotal+newcount*newaverage)} 
    (total/count) // Tested by returning count here and then extracting. Got same result as sum. 
    } 
) 

display(statset.groupBy($"name").agg(sum($"count").as("count"), 
        weightedAverage(struct(collect_list($"count"), 
            collect_list($"average"))).as("average"))) 

(感謝回答Passing a list of tuples as a parameter to a spark udf in scala幫忙寫這)

福利局ies:使用這些進口:

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import scala.collection.mutable.WrappedArray 

是否有一種方法可以通過內置列函數而不是UDF來完成此操作? UDF感覺笨重,如果數字變大,你必須將Int's轉換成Long's。

回答

1

看起來你可以分兩次做到這一點:

val totalCount = statset.select(sum($"count")).collect.head.getLong(0) 

statset.select(lit(totalCount) as "count", sum($"average" * $"count"/lit(totalCount)) as "average").show 

或者,包括您剛纔添加的GROUPBY:

display(statset.groupBy($"name").agg(sum($"count").as("count"), 
        sum($"count"*$"average").as("total")) 
       .select($"name",$"count",($"total"/$"count"))) 
+0

在我實際的代碼我有一個GROUPBY ......不過,這可能會工作... –

+0

我會在第二次聚合中添加總數作爲另一列,然後在最後進行分割。第二遍需要通過少得多的數據。 –

+0

@MichelLemay:謝謝!這正是我需要慢慢思考的地方。我建議您對答案進行編輯,這也適用於groupBy。 –