雖然我試圖爲我們的一個複雜問題創建一個UDAF,但我決定從一個基本的UDAF開始,它返回列。由於我是Spark SQL/Scala的新手,有人可以幫助我,突出我的錯誤。Spark SQL(v2.0)Scala中的UDAF返回空字符串
以下是代碼:
進口org.apache.spark.sql.expressions.MutableAggregationBuffer 進口org.apache.spark.sql.expressions.UserDefinedAggregateFunction 進口org.apache.spark.sql。行進口org.apache.spark.sql.types._ 進口org.apache.spark.sql.types.DataTypes
進口scala.collection._
對象MinhashUdaf延伸UserDefinedAggregateFunction {
倍率DEF inputSchema:org.apache.spark.sql.types.StructType = StructType(StructField( 「值」,StringType)::無)
倍率DEF bufferSchema:StructType = StructType( StructField( 「帶狀皰疹」,(StringType))::無)
倍率DEF數據類型:數據類型=(StringType)
倍率DEF確定性:布爾=真
倍率DEF初始化(緩衝液:MutableAggregationBuffer):單位= { 緩衝液(0)=( 「」)}
倍率DEF更新(緩衝液:MutableAggregationBuffer,輸入:行): 單位= { buffer.update(0,input.toString ())}
倍率DEF合併(緩衝器1:MutableAggregationBuffer,緩衝器2:行): 單位= {}
倍率DEF評估(緩衝液:行):任何= { 緩衝液(0)}}
對於運行上述UDAF,以下是代碼:
DEF主(參數:數組[字符串]){ VAL火花:SparkSession = SparkSession.builder 的.master( 「本地[*]」) .appName(「test」) .getOrCreate();
import spark.implicits._; val df = spark.read.json("people.json") df.createOrReplaceTempView("people") val sqlDF = spark.sql("Select name from people") sqlDF.show() val minhash = df.select(MinhashUdaf(col("name")).as("minhash")) minhash.printSchema() minhash.show(truncate = false)
由於UDAF我返回輸入,因爲它是我應該得到的列「名稱」值的每一行,因爲它是。而在運行上面的字符串時,我返回一個空字符串。
您執行了哪個操作?什麼是預期的輸出?實際產出? – Yaron
@Yaron:我編輯了我的問題以包含來自我運行UDAF的代碼。預期產出: - 列的價值,因爲它是。實際輸出: - 空字符串 – ANDY