2017-07-25 71 views
0

我哈瓦一個數據幀,列num是多變的,每column'type爲int,所以我想所有列的總和,所以我想用的:_ *,這是我的代碼:如何在udf中使用scala/spark使用可變參數_ *?

val arr = Array(1,4,3,2,5,7,3,5,4,18) 
    val input=new ArrayBuffer[(Int,Int)]() 
    for(i<-0 until 10){ 
     input.append((i,arr(i%10))) 
    } 

    var df=sc.parallelize(input,3).toDF("value1","value2") 
    val cols=new ArrayBuffer[Column]() 
    val colNames=df.columns 
    for(name<-colNames){ 
     cols.append(col(name)) 
    } 
    val func = udf((s: Int*) => s.sum) 
    df.withColumn("sum",func(cols:_*)).show() 

但我得到一個錯誤:

Error:(101, 27) ')' expected but identifier found. 
    val func = udf((s: Int*) => s.sum) 
Error:(101, 27) ')' expected but identifier found. 
    val func = udf((s: Int*) => s.sum) 

如何使用:_ *在UDF? 除了結果我的是:

+------+------+---+ 
|value1|value2|sum| 
+------+------+---+ 
|  0|  1| 1| 
|  1|  4| 5| 
|  2|  3| 5| 
|  3|  2| 5| 
|  4|  5| 9| 
|  5|  7| 12| 
|  6|  3| 9| 
|  7|  5| 12| 
|  8|  4| 12| 
|  9| 18| 27| 
+------+------+---+ 

回答

2

星火UDF不支持可變長度參數, 這裏是你的問題的解決方案。

import spark.implicits._ 

val input = Array(1,4,3,2,5,7,3,5,4,18).zipWithIndex 

var df=spark.sparkContext.parallelize(input,3).toDF("value2","value1") 

df.withColumn("total", df.columns.map(col(_)).reduce(_ + _)) 

輸出:

+------+------+-----+ 
|value2|value1|total| 
+------+------+-----+ 
|  1|  0| 1| 
|  4|  1| 5| 
|  3|  2| 5| 
|  2|  3| 5| 
|  5|  4| 9| 
|  7|  5| 12| 
|  3|  6| 9| 
|  5|  7| 12| 
|  4|  8| 12| 
| 18|  9| 27| 
+------+------+-----+ 

希望這有助於

1

你可以嘗試VectorAssembler

import org.apache.spark.ml.feature.VectorAssembler 
import breeze.linalg.DenseVector 

val assembler = new VectorAssembler(). 
    setInputCols(Array("your column name")). 
    setOutputCol("allNum") 

val assembledDF = assembler.transform(df) 

assembledDF.show 

+------+------+----------+              
|value1|value2| allNum| 
+------+------+----------+ 
|  0|  1| [0.0,1.0]| 
|  1|  4| [1.0,4.0]| 
|  2|  3| [2.0,3.0]| 
|  3|  2| [3.0,2.0]| 
|  4|  5| [4.0,5.0]| 
|  5|  7| [5.0,7.0]| 
|  6|  3| [6.0,3.0]| 
|  7|  5| [7.0,5.0]| 
|  8|  4| [8.0,4.0]| 
|  9| 18|[9.0,18.0]| 
+------+------+----------+ 

def yourSumUDF = udf((allNum:Vector) => new DenseVector(allNum.toArray).sum) 
assembledDF.withColumn("sum", yourSumUDF($"allNum")).show 

+------+------+----------+----+      
|value1|value2| allNum| sum| 
+------+------+----------+----+ 
|  0|  1| [0.0,1.0]| 1.0| 
|  1|  4| [1.0,4.0]| 5.0| 
|  2|  3| [2.0,3.0]| 5.0| 
|  3|  2| [3.0,2.0]| 5.0| 
|  4|  5| [4.0,5.0]| 9.0| 
|  5|  7| [5.0,7.0]|12.0| 
|  6|  3| [6.0,3.0]| 9.0| 
|  7|  5| [7.0,5.0]|12.0| 
|  8|  4| [8.0,4.0]|12.0| 
|  9| 18|[9.0,18.0]|27.0| 
+------+------+----------+----+ 
2

這可能你所期望的

val func = udf((s: Seq[Int]) => s.sum) 
df.withColumn("sum", func(array(cols: _*))).show() 

其中arrayorg.apache.spark.sql.functions.array

Creates a new array column. The input columns must all have the same data type.

相關問題