在Apache Spark中有input_file_name函數,我用它將新列添加到Dataset中,並將其與當前正在處理的文件的名稱相加。UDF從Spark SQL中的路徑中僅提取文件名
問題是我想以某種方式定製此函數以僅返回文件名,在S3上省略它的完整路徑。
現在,我在第二步驟中使用地圖功能做更換的路徑:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
但我想使用類似
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
'.withColumn( 「input_file_name」,get_only_file_name(input_file_name))'。這裏'get_only_file_name'是udf。 – mrsrinivas