2016-10-10 92 views
0

我想在加載多個輸入文件到一個單一的數據框:加載多個輸入文件到斯卡拉一個數據幀/星火1.6

val inputs = List[String]("input1.txt", "input2.txt", "input3.txt") 

val dataFrames = for (
    i <- inputs; 
    df <- sc.textFile(i).toDF() 
) yield {df} 

val inputDataFrame = unionAll(dataFrames, sqlContext) 

// union of all given DataFrames 
private def unionAll(dataFrames: Seq[DataFrame], sqlContext: SQLContext): DataFrame = dataFrames match { 
    case Nil => sqlContext.emptyDataFrame 
    case head :: Nil => head 
    case head :: tail => head.unionAll(unionAll(tail, sqlContext)) 
} 

編譯器說

Error:(40, 8) type mismatch; 
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 
required: scala.collection.GenTraversableOnce[?] 
    df <- sc.textFile(i).toDF() 
    ^

任何想法?

回答

0

首先,SQLContext.read.text(...)接受多個文件名參數,所以你可以簡單地做:

val inputs = List[String]("input1.txt", "input2.txt", "input3.txt") 
val inputDataFrame = sqlContext.read.text(inputs: _*) 

或者:

val inputDataFrame = sqlContext.read.text("input1.txt", "input2.txt", "input3.txt") 

至於你的代碼 - 當你寫:

val dataFrames = for (
    i <- inputs; 
    df <- sc.textFile(i).toDF() 
) yield df 

它被翻譯成:

inputs.flatMap(i => sc.textFile(i).toDF().map(df => df)) 

哪個不能編譯,因爲flatMap期望返回GenTraversableOnce[?]的功能,而提供的函數返回一個RDD[Row](見的DataFrame.map簽名)。換句話說,當你編寫df <- sc.textFile(i).toDF()時,你實際上是在數據幀中將每個行的行取出,然後用這些行產生一個新的RDD,這不是你想要的。

你所試圖做的是簡單的:

val dataFrames = for (
    i <- inputs; 
) yield sc.textFile(i).toDF() 

但是,正如開頭所提到的,推薦的方法是使用sqlContext.read.text

相關問題