2016-06-09 51 views
0

我需要使用Scala API在Spark中編寫一個場景。 我將一個用戶定義的函數傳遞給一個數據框,它逐個處理數據框的每一行並返回元組(Row,Row)。我如何將RDD(行,行)更改爲Dataframe(行)?見下面的代碼示例 -Spark - 如何將地圖功能輸出(行,行)元組轉換爲一個Dataframe

**Calling map function-** 
    val df_temp = df_outPut.map { x => AddUDF.add(x,date1,date2)} 
**UDF definition.** 
    def add(x: Row,dates: String*): (Row,Row) = { 
...................... 
........................ 
    var result1,result2:Row = Row() 
.......... 
    return (result1,result2) 

現在df_temp是一個RDD(Row1,Row2)。我的要求是通過將元組元素分解爲1個RDD或Dataframe的記錄來使其成爲一個RDD或Dataframe RDD(行)。感謝你的幫助。

+0

如何將兩行元素組合起來?第二個列應該附加到第一個列?可能在兩行中都存在共同的列?沒有這些信息,問題就不清楚了。 –

回答

2

您可以使用flatMap扁平化你的元組行,說如果我們從這個例子RDD開始:

rddExample.collect() 
// res37: Array[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = Array(([1,2],[3,4]), ([2,1],[4,2])) 

val flatRdd = rddExample.flatMap{ case (x, y) => List(x, y) } 
// flatRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[45] at flatMap at <console>:35 

要將其轉換成數據幀。

import org.apache.spark.sql.types.{StructType, StructField, IntegerType} 

val schema = StructType(StructField("x", IntegerType, true):: 
         StructField("y", IntegerType, true)::Nil)  
val df = sqlContext.createDataFrame(flatRdd, schema) 
df.show 
+---+---+ 
| x| y| 
+---+---+ 
| 1| 2| 
| 3| 4| 
| 2| 1| 
| 4| 2| 
+---+---+ 
+0

像魅力一樣工作。萬分感謝 :) –

相關問題