2017-01-15 82 views
0

我試圖使用在運行時讀取的模式文件將輸入從文本文件轉換爲數據幀。 我輸入的文本文件看起來像這樣:動態地將textFile轉換爲dataFrame

John,23 
Charles,34 

模式文件看起來是這樣的:

name:string 
age:integer 

這是我的嘗試:

object DynamicSchema { 
    def main(args: Array[String]) { 
    val inputFile = args(0) 
    val schemaFile = args(1) 
    val schemaLines = Source.fromFile(schemaFile, "UTF-8").getLines().map(_.split(":")).map(l => l(0) -> l(1)).toMap 
    val spark = SparkSession.builder() 
     .master("local[*]") 
     .appName("Dynamic Schema") 
     .getOrCreate() 
    import spark.implicits._ 
    val input = spark.sparkContext.textFile(args(0)) 
    val schema = spark.sparkContext.broadcast(schemaLines) 
    val nameToType = { 
     Seq(IntegerType,StringType) 
     .map(t => t.typeName -> t).toMap 
    } 
    println(nameToType) 
    val fields = schema.value 
     .map(field => StructField(field._1, nameToType(field._2), nullable = true)).toSeq 
    val schemaStruct = StructType(fields) 
    val rowRDD = input 
     .map(_.split(",")) 
     .map(attributes => Row.fromSeq(attributes)) 
    val peopleDF = spark.createDataFrame(rowRDD, schemaStruct) 
    peopleDF.printSchema() 

    // Creates a temporary view using the DataFrame 
    peopleDF.createOrReplaceTempView("people") 

    // SQL can be run over a temporary view created using DataFrames 
    val results = spark.sql("SELECT name FROM people") 
    results.show() 
    } 
} 

雖然printSchema得到期望的結果,result.show出錯。我認爲年齡字段實際上需要使用toInt進行轉換。當架構僅在運行時可用時,是否有辦法實現相同的功能?

+0

請發佈錯誤日誌。 –

回答

1

更換

val input = spark.sparkContext.textFile(args(0)) 

val input = spark.read.schema(schemaStruct).csv(args(0)) 

和架構定義後移動。

+0

謝謝!像魅力一樣工作。只是一個相關的問題。我是如何構建StructFields和StructType的正確方法?還是有更好或更優雅的解決方案? – Shasankar

+0

'broadcast'沒有意義。其餘的看起來很合理。 – user7337271