2017-09-07 64 views
0

我有一個循環,它在每次迭代中生成行。我的目標是創建一個具有給定模式的數據框,該數據框僅包含那些行。我心裏有一組要遵循的步驟,但我不能夠在新在每次循環迭代Spark - 從循環中生成的行列表創建數據框

我嘗試以下方法添加到List[Row]

var listOfRows = List[Row]() 

val dfToExtractValues: DataFrame = ??? 

dfToExtractValues.foreach { x => 

    //Not really important how to generate here the variables 
    //So to simplify all the rows will have the same values 

    var col1 = "firstCol" 
    var col2 = "secondCol" 
    var col3 = "thirdCol" 

    val newRow = RowFactory.create(col1,col2,col3) 

    //This step I am not able to do 
    //listOfRows += newRow  -> Just for strings 
    //listOfRows.add(newRow)  -> This add doesnt exist, it is a addString 
    //listOfRows.aggregate(1)(newRow)  -> This is not how aggreage works... 
} 


val rdd = sc.makeRDD[RDD](listOfRows) 

val dfWithNewRows = sqlContext.createDataFrame(rdd, myOriginalDF.schema) 

有人能告訴我我做錯了什麼,或者我在改變生成一個數據框的方法時會改變什麼?

也許有更好的方法來收集行而不是List [Row]。但是,我需要將其他類型的集合轉換爲數據框。

回答

1

有人能告訴我什麼我做錯了

瓶蓋

首先它看起來像你的編程指南中跳過Understanding Closures。任何嘗試修改通過閉包傳遞的變量都是徒勞的。您所能做的只是修改副本,並且更改不會全局反映出來。

變量不使對象變更:

var listOfRows = List[Row]() 

創建一個變量。分配List是一樣不變的。如果在Spark背景是不是你可以創建一個新的List並重新分配:

listOfRows = newRow :: listOfRows 

請注意,我們不perpend追加 - 你不想追加到循環列表。

當您想共享數據(例如Akka中的常見模式)時,帶有不可變對象的變量很有用,但在Spark中沒有多少應用程序。

讓事情分佈:

最後從不取數據的驅動器只是爲了再次分發。您還應該避免在RDDsDataFrames之間進行不必要的轉換。最好是使用DataFrame運營商一路:

dfToExtractValues.select(...) 

但如果你需要更復雜的東西map

import org.apache.spark.sql.catalyst.encoders.RowEncoder 

dfToExtractValues.map(x => ...)(RowEncoder(schema))