0
雖然有人已經詢問了有關計算Weighted Average in Spark的問題,但在此問題中,我詢問的是使用數據集/數據框而不是RDD。使用無UDF的Spark數據集的加權平均值
如何計算Spark中的加權平均值?我有兩列:計數和以前的平均值:
case class Stat(name:String, count: Int, average: Double)
val statset = spark.createDataset(Seq(Stat("NY", 1,5.0),
Stat("NY",2,1.5),
Stat("LA",12,1.0),
Stat("LA",15,3.0)))
我想能夠計算的加權平均值是這樣的:
display(statset.groupBy($"name").agg(sum($"count").as("count"),
weightedAverage($"count",$"average").as("average")))
可以使用一個UDF親近:
val weightedAverage = udf(
(row:Row)=>{
val counts = row.getAs[WrappedArray[Int]](0)
val averages = row.getAs[WrappedArray[Double]](1)
val (count,total) = (counts zip averages).foldLeft((0,0.0)){
case((cumcount:Int,cumtotal:Double),(newcount:Int,newaverage:Double))=>(cumcount+newcount,cumtotal+newcount*newaverage)}
(total/count) // Tested by returning count here and then extracting. Got same result as sum.
}
)
display(statset.groupBy($"name").agg(sum($"count").as("count"),
weightedAverage(struct(collect_list($"count"),
collect_list($"average"))).as("average")))
(感謝回答Passing a list of tuples as a parameter to a spark udf in scala幫忙寫這)
福利局ies:使用這些進口:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.collection.mutable.WrappedArray
是否有一種方法可以通過內置列函數而不是UDF來完成此操作? UDF感覺笨重,如果數字變大,你必須將Int's轉換成Long's。
在我實際的代碼我有一個GROUPBY ......不過,這可能會工作... –
我會在第二次聚合中添加總數作爲另一列,然後在最後進行分割。第二遍需要通過少得多的數據。 –
@MichelLemay:謝謝!這正是我需要慢慢思考的地方。我建議您對答案進行編輯,這也適用於groupBy。 –