2015-09-21 47 views
1

是否可以在Spark Scala中使用非數字值來旋轉表格?我回顧了以下兩個Stack問題。Spark Scala中的樞軸非數字表格

How to pivot DataFrame?

List in the Case-When Statement in Spark SQL

繼「中的案例當列表」的問題,我可以將我的數據,使每個數據類型是列,但有一排中的每個步驟實體數據類型組合。

id tag value 
1  US  foo 
1  UK  bar 
1  CA  baz 
2  US  hoo 
2  UK  hah 
2  CA  wah 

id US UK CA 
1  foo 
1   bar 
1     baz 
2  hoo 
2   hah 
3     wah 

是否有一個「第一個非空」函數可以將每個實體的多行摺疊爲一個?

id US UK CA 
1  foo bar baz 
2  hoo hah wah 

回答

0

您可以考慮aggregate方法(或aggregateByKey)。你只需要編寫適當的函數來獲取每個位置的非空元素。

+0

我曾希望找到一個DataFrame方法來做到這一點,但回到元組的RDD和aggregateByKey最終做到了。感謝您的建議。 –

+1

可用的任何代碼片斷了解它是如何工作的?謝謝 – user299791

+0

對不起,@ user299791,我轉移到另一個項目,直到幾個星期前纔看到您的問題。然後,我花了一段時間來追蹤我用來解決問題的源代碼並將其清理到足以分享。我會用完整的示例類發佈另一個答案。 –

1

下面是一個完整的Scala類,它創建一個樣本數據框,然後將其轉向。這是特定的問題,所以我不知道它通常會有多大用處。也沒有廣泛測試,所以買家要小心。

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext} 
import org.apache.spark.sql.functions.{lit, when} 
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 

object DemoPivot { 

    def main(args: Array[String]) = { 

    def pivotColumn(df: DataFrame)(t: String): Column = { 
     val col = when(df("tag") <=> lit(t), df("value")) 
     col.alias(t) 
    } 

    def pivotFrame(sqlContext: SQLContext, df: DataFrame): DataFrame = { 
     val tags = df.select("tag").distinct.map(r => r.getString(0)).collect.toList 
     df.select(df("id") :: tags.map(pivotColumn(df)): _*) 
    } 

    def aggregateRows(value: Seq[Option[Any]], agg: Seq[Option[Any]]): Seq[Option[Any]] = { 
     for (i <- 0 until Math.max(value.size, agg.size)) yield i match { 
     case x if x > value.size => agg(x) 
     case y if y > agg.size => value(y) 
     case z if value(z).isEmpty => agg(z) 
     case a => value(a) 
     } 
    } 

    def collapseRows(sqlContext: SQLContext, df: DataFrame): DataFrame = { 
     // RDDs cannot have null elements, so pack into Options and unpack before returning 
     val rdd = df.map(row => (Some(row(0)), row.toSeq.tail.map(element => if (element == null) None else Some(element)))) 
     val agg = rdd.reduceByKey(aggregateRows) 
     val aggRdd = agg.map{ case (key, list) => Row.fromSeq((key.get) :: (list.map(element => if (element.isDefined) element.get else null)).toList) } 
     sqlContext.createDataFrame(aggRdd, df.schema) 
    } 

    val conf = new SparkConf().setAppName("Simple Pivot Demo") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val data = List((1, "US", "foo"), (1, "UK", "bar"), (1, "CA", "baz"), 
        (2, "US", "hoo"), (2, "UK", "hah"), (2, "CA", "wah")) 
    val rows = data.map(d => Row.fromSeq(d.productIterator.toList)) 
    val fields = Array(StructField("id", IntegerType, nullable = false), 
         StructField("tag", StringType, nullable = false), 
         StructField("value", StringType, nullable = false)) 
    val df = sqlContext.createDataFrame(sc.parallelize(rows), StructType(fields)) 
    df.show() 

    val pivoted = pivotFrame(sqlContext, df) 
    pivoted.show() 

    val collapsed = collapseRows(sqlContext, pivoted) 
    collapsed.show() 
    } 
}