0
我具有以下數據集:火花定製聚合> = 2.0(階)
val myDS = List(("a",1,1.1), ("b",2,1.2), ("a",3,3.1), ("b",4,1.4), ("a",5,5.1)).toDS
// and aggregation
// myDS.groupByKey(t2 => t2._1).agg(myAvg).collect()
欲編寫自定義集合函數myAvg
這需要Tuple3參數和返回sum(_._2)/sum(_._3)
。 我知道,它可以用其他方式計算,但我想編寫自定義聚合。
我寫了這樣的事情:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
val myAvg = new Aggregator[Tuple3[String, Integer, Double],
Tuple2[Integer,Double],
Double] {
def zero: Tuple2[Integer,Double] = Tuple2(0,0.0)
def reduce(agg: Tuple2[Integer,Double],
a: Tuple3[String, Integer,Double]): Tuple2[Integer,Double] =
Tuple2(agg._1 + a._2, agg._2 + a._3)
def merge(agg1: Tuple2[Integer,Double],
agg2: Tuple2[Integer,Double]): Tuple2[Integer,Double] =
Tuple2(agg1._1 + agg2._1, agg1._2 + agg2._2)
def finish(res: Tuple2[Integer,Double]): Double = res._1/res._2
def bufferEncoder: Encoder[(Integer, Double)] =
Encoders.tuple(Encoders.INT, Encoders.scalaDouble)
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}.toColumn()
不幸的是我收到以下錯誤:
java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit()
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.Column.apply(Column.scala:217)
有什麼不對?
在我的本地星火2.1我收到一個警告
warning: there was one deprecation warning; re-run with -deprecation for details
什麼在我的代碼已經過時?
感謝您的任何建議。
一分鐘後...但是,錯誤信息是正確的... 問題出在'column()'=>'column' – SmallerThan