2016-10-17 69 views
2

有沒有辦法到org.apache.spark.sql.DataFrame這樣星火SQL:轉換一些行轉換成列

Predictor icaoCode num1 num2 
P1   OTHH  1.1 1.2 
P1   ZGGG  2.1 2.2 
P2   OTHH  3.1 3.2 
P2   ZGGG  4.1 4.2 
P3   OTHH  5.1 5.2 
P3   ZGGG  6.1 6.2 
.   .   .  . 
.   .   .  . 
.   .   .  . 

轉變成DataFrame這樣嗎?

icaoCode P1.num1 P1.num2 P2.num1 P2.num2 P3.num1 P3.num2 ... 
OTHH   1.1  1.2  3.1  3.2  5.1  5.2 ... 
ZGGG   2.1  2.2  4.1  4.2  6.1  6.2 ... 
.    .  .  .  .  .  . ...  
.    .  .  .  .  .  . ...  
.    .  .  .  .  .  . ...  

可以有用於PredictoricaoCode值的任意數量。

回答

1

With Spark 1.6.0,有一個pivot function轉換/轉置您的數據。在你的情況下,它需要一些預處理來獲取數據準備好pivot。下面一個例子我怎麼會做它:

def doPivot(): Unit = { 
    val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc) 

    // dummy data 
    val r1 = Input("P1", "OTHH", 1.1, 1.2) 
    val r2 = Input("P1", "ZGGG", 2.1, 2.2) 
    val r3 = Input("P2", "OTHH", 3.1, 3.2) 

    val records = Seq(r1, r2, r3) 
    val df = sqlContext.createDataFrame(records) 

    // prepare data for pivot 
    val fullName: ((String, String) => String) = (predictor: String, num: String) => { 
    predictor + "." + num 
    } 
    val udfFullName = udf(fullName) 
    val dfFullName = df.withColumn("num1-complete", udfFullName(col("predictor"), lit("num1"))) 
    .withColumn("num2-complete", udfFullName(col("predictor"), lit("num2"))) 

    val dfPrepared = dfFullName.select(col("icaoCode"), col("num1") as "num", col("num1-complete") as "value") 
    .unionAll(dfFullName.select(col("icaoCode"), col("num2") as "num", col("num2-complete") as "value")) 

    // transpose/pivot dataframe 
    val dfPivoted = dfPrepared.groupBy(col("icaoCode")).pivot("value").mean("num") 
    dfPivoted.show() 
} 

case class Input(predictor: String, icaoCode: String, num1: Double, num2: Double) 

最終的數據幀應該爲你工作:

+--------+-------+-------+-------+-------+ 
|icaoCode|P1.num1|P1.num2|P2.num1|P2.num2| 
+--------+-------+-------+-------+-------+ 
| OTHH| 1.1| 1.2| 3.1| 3.2| 
| ZGGG| 2.1| 2.2| null| null| 
+--------+-------+-------+-------+-------+