我試圖使用在運行時讀取的模式文件將輸入從文本文件轉換爲數據幀。 我輸入的文本文件看起來像這樣:動態地將textFile轉換爲dataFrame
John,23
Charles,34
模式文件看起來是這樣的:
name:string
age:integer
這是我的嘗試:
object DynamicSchema {
def main(args: Array[String]) {
val inputFile = args(0)
val schemaFile = args(1)
val schemaLines = Source.fromFile(schemaFile, "UTF-8").getLines().map(_.split(":")).map(l => l(0) -> l(1)).toMap
val spark = SparkSession.builder()
.master("local[*]")
.appName("Dynamic Schema")
.getOrCreate()
import spark.implicits._
val input = spark.sparkContext.textFile(args(0))
val schema = spark.sparkContext.broadcast(schemaLines)
val nameToType = {
Seq(IntegerType,StringType)
.map(t => t.typeName -> t).toMap
}
println(nameToType)
val fields = schema.value
.map(field => StructField(field._1, nameToType(field._2), nullable = true)).toSeq
val schemaStruct = StructType(fields)
val rowRDD = input
.map(_.split(","))
.map(attributes => Row.fromSeq(attributes))
val peopleDF = spark.createDataFrame(rowRDD, schemaStruct)
peopleDF.printSchema()
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
results.show()
}
}
雖然printSchema得到期望的結果,result.show出錯。我認爲年齡字段實際上需要使用toInt進行轉換。當架構僅在運行時可用時,是否有辦法實現相同的功能?
請發佈錯誤日誌。 –