2016-11-14 15 views
0

雖然我試圖爲我們的一個複雜問題創建一個UDAF,但我決定從一個基本的UDAF開始,它返回列。由於我是Spark SQL/Scala的新手,有人可以幫助我,突出我的錯誤。Spark SQL(v2.0)Scala中的UDAF返回空字符串

以下是代碼:

進口org.apache.spark.sql.expressions.MutableAggregationBuffer 進口org.apache.spark.sql.expressions.UserDefinedAggregateFunction 進口org.apache.spark.sql。行進口org.apache.spark.sql.types._ 進口org.apache.spark.sql.types.DataTypes

進口scala.collection._

對象MinhashUdaf延伸UserDefinedAggregateFunction {

倍率DEF inputSchema:org.apache.spark.sql.types.StructType = StructType(StructField( 「值」,StringType)::無)

倍率DEF bufferSchema:StructType = StructType( StructField( 「帶狀皰疹」,(StringType))::無)

倍率DEF數據類型:數據類型=(StringType)

倍率DEF確定性:布爾=真

倍率DEF初始化(緩衝液:MutableAggregationBuffer):單位= { 緩衝液(0)=( 「」)}

倍率DEF更新(緩衝液:MutableAggregationBuffer,輸入:行): 單位= { buffer.update(0,input.toString ())}

倍率DEF合併(緩衝器1:MutableAggregationBuffer,緩衝器2:行): 單位= {}

倍率DEF評估(緩衝液:行):任何= { 緩衝液(0)}}

對於運行上述UDAF,以下是代碼:

DEF主(參數:數組[字符串]){ VAL火花:SparkSession = SparkSession.builder 的.master( 「本地[*]」) .appName(「test」) .getOrCreate();

import spark.implicits._; 

val df = spark.read.json("people.json") 
df.createOrReplaceTempView("people") 
val sqlDF = spark.sql("Select name from people") 
sqlDF.show() 

val minhash = df.select(MinhashUdaf(col("name")).as("minhash")) 
minhash.printSchema() 
minhash.show(truncate = false) 

由於UDAF我返回輸入,因爲它是我應該得到的列「名稱」值的每一行,因爲它是。而在運行上面的字符串時,我返回一個空字符串。

+0

您執行了哪個操作?什麼是預期的輸出?實際產出? – Yaron

+0

@Yaron:我編輯了我的問題以包含來自我運行UDAF的代碼。預期產出: - 列的價值,因爲它是。實際輸出: - 空字符串 – ANDY

回答

0

您沒有實現合併功能。

使用下面的代碼,您可以根據需要打印列的值。

object MinhashUdaf extends UserDefinedAggregateFunction { 

override def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil) 

override def bufferSchema: StructType = StructType(StructField("shingles", (StringType)) :: Nil) 

override def dataType: DataType = (StringType) 

override def deterministic: Boolean = true 

override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = ("") } 

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer.update(0, input.get(0)) } 

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0, buffer2.get(0))} 

override def evaluate(buffer: Row): Any = { buffer(0) } }