2017-10-06 186 views
0

這是怎麼我在火花數據幀使用UDF ..UDF不工作火花階

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 

import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 

spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4)) 


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show() 

獲取文件名但是當我運行此我得到以下錯誤

<console>:545: error: not found: value get_cus_val 
     df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show() 

但我能夠得到完整路徑的文件的名稱,如果我這樣做..

df1result.withColumn("DataPartition", input_file_name).show() 

任何想法我失蹤了什麼?

回答

2

這不起作用,因爲您只註冊SQL函數。您可以嘗試

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4)) 

df1result.selectExpr("*", "get_cus_val(input_file_name) as DataPartition").show()