2017-10-10 100 views
0

的數組我是新來斯卡拉發展,試圖解決以下問題:斯卡拉UDAF與返回類型爲複雜對象

我有一個UDAF返回複雜對象的數組(這是一個字符串,字符串數組) 。在更新方法中,buffer返回的是wrappedArray類型,我不知道如何用緩衝區中的新值更新。我試圖將其轉換爲序列,但沒有工作...

case class variablePairs(val variable1: String, val Respondents: Seq[String]) 

    import java.util 
    import java.util.Collections 

    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.expressions.MutableAggregationBuffer 
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction 
    import org.apache.spark.sql.types.{DataType, DataTypes, StringType, StructType} 

    class MyUDF extends UserDefinedAggregateFunction { 

     override def inputSchema(): StructType = 
     new StructType() 
      .add("variable1", DataTypes.StringType) 
      .add("variable2CSList", DataTypes.StringType) 

     //intermediate schema 
     override def bufferSchema(): StructType = 
     new StructType() 
      .add("Households", DataTypes.createArrayType(
      DataTypes.StringType)) 
      new StructType() 
       .add("variable1",DataTypes.StringType) 
       .add("variable2",DataTypes.createArrayType(
       DataTypes.StringType 
      )) 


     //output schema 
     override def dataType(): DataType = new StructType() 
      new StructType() 
      .add("Households", DataTypes.createArrayType(
       new StructType() 
       .add("variable1",DataTypes.StringType) 
       .add("variable2",DataTypes.createArrayType(
        DataTypes.StringType 
       )))) 


     override def deterministic(): Boolean = true 

     override def initialize(buffer: MutableAggregationBuffer): Unit = { 
     buffer.update(0, Seq[String]()) 
     } 

     override def update(buffer: MutableAggregationBuffer, row: Row): Unit = { 

     val variable1: String = row.getString(0) 
     val variable2CSList:String = row.getString(1); 

     val respondentsIdArray:Array[String] = variable2CSList.split(",") 

     val houseHold:variablePairs = variablePairs(variable1 = variable1, Respondents = respondentsIdArray.toSeq) 

     val wrappedArray = buffer.get(0).asInstanceOf[Seq[String]] 

     val households:Seq[variablePairs] = Seq(houseHold) 

     buffer.update(0,wrappedArray.toArray ++ variable1) 
     } 

     override def merge(buffer: MutableAggregationBuffer, row: Row): Unit = { 
     val oldList = buffer.getList[variablePairs](0); 
     val newList = row.getList[variablePairs](0); 

     buffer.update(0,oldList.addAll(newList)) 
     } 

     override def evaluate(row: Row): AnyRef = { 
     new Tuple1(row.get(0)); 
     } 
    } 

    I got below error while running this code: 

App > 17/10/10 22:01:48 task-result-getter-1 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 31, ip-10-61-41-163.ec2.internal, executor 3): scala.MatchError: 1 (of class java.lang.Character) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:162) 
App > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
App > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
App > at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
App > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
App > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
App > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:162) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) 
App > at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:383) 
App > at org.apache.spark.sql.execution.aggregate.MutableAggregationBufferImpl.update(udaf.scala:246) 
App > at com.turner.audiencematters.udf.RespondentPairUDF.update(RespondentPairUDF.scala:65) 
App > at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:425) 
App > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171) 
App > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171) 
App > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187) 
App > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181) 
App > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:122) 
App > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:157) 
App > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) 
App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
App > at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:150) 
App > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
App > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
App > at org.apache.spark.scheduler.Task.run(Task.scala:99) 
App > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
App > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
App > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
App > at java.lang.Thread.run(Thread.java:745) 

回答

0

我也有類似的問題,但只是想返回Array[String]

這段代碼是有幫助的:

https://gist.github.com/sadikovi/7608c8c7eb5d7fe69a1a

... 
override def dataType: DataType = ArrayType(StringType) 
... 
override def evaluate(buffer: Row): Array[String] = { 
... 
:這在我UDAF爲我工作的代碼

提取物

我希望這有助於!