2016-11-11 89 views
0

我具有由火花數據幀像下面的輸出:劈裂列在將新行[斯卡拉]

金額| ID |數字|開始_ |標識符
43.45 | 19840 | A345 | [2014 -12-26,2013-12-12] | [232323,45466] |
43.45 | 19840 | A345 | [2010-03-16,2013-16-12] | [34343,45454] |

我的要求是從上述輸出

金額以下格式以生成輸出| ID |數字|開始_ |標識符
43.45 | 19840 | A345 | 2014-12-26 | 232323
43.45 | 19840 | A345 |二〇一三年十二月十二日| 45466
43.45 | 19840 | A345 | 2010-03-16 | 34343
43.45 | 19840 | A345 | 2013-16-12 | 45454

有人可以幫我實現這個。

回答

1

這是你要找的東西嗎?

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 

val sparkSession = ... 
import sparkSession.implicits._ 

val input = sc.parallelize(Seq(
    (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)), 
    (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454)) 
)).toDF("amt", "id", "num", "start_date", "identifier") 

val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) => 
    dates.zip(identifiers) 
} 

val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier"))) 
    .select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier")) 

output.show() 

將返回:

+-----+-----+----+----------+----------+ 
| amt| id| num|start_date|identifier| 
+-----+-----+----+----------+----------+ 
|43.45|19840|A345|2014-12-26| 232323| 
|43.45|19840|A345|2013-12-12|  45466| 
|43.45|19840|A345|2010-03-16|  34343| 
|43.45|19840|A345|2013-16-12|  45454| 
+-----+-----+----+----------+----------+ 

編輯:

既然你想有應拉上多列,你應該嘗試這樣的事:

val input = sc.parallelize(Seq(
    (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")), 
    (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456")) 
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column") 

val zipArrays = udf { seqs: Seq[Seq[String]] => 
    for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i)) 
} 

val columnsToSelect = Seq($"amt", $"id", $"num") 
val columnsToZip = Seq($"start_date", $"identifier", $"another_column") 
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) => 
    $"col".getItem(index).as(column.toString()) 
} 

val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*) 

output.show() 

/* 
+-----+-----+----+----------+----------+--------------+ 
| amt| id| num|start_date|identifier|another_column| 
+-----+-----+----+----------+----------+--------------+ 
|43.45|19840|A345|2014-12-26| 232323|   123| 
|43.45|19840|A345|2013-12-12|  45466|   234| 
|43.45|19840|A345|2010-03-16|  34343|   345| 
|43.45|19840|A345|2013-16-12|  45454|   456| 
+-----+-----+----+----------+----------+--------------+ 
*/ 
+0

是的,這正是我所看到的。但我認爲這隻有當我有兩列(start_date&標識符)時纔有效。是否有可能使其成爲動態的,以便它可以用於任意數量的列? –

+0

這是可能的,但我想到的解決方案需要所有這些壓縮列具有相同的類型(例如'string',所以'identifier'列中的所有值都必須是字符串)。你會接受嗎? –

+0

是的。如果你分享示例代碼,那將是非常好的。謝謝。 –

0

如果我理解正確,您需要第3列和第4列的第一個元素。 這是否有意義?

val newDataFrame = for { 
    row <- oldDataFrame 
} yield { 
    val zro = row(0) // 43.45 
    val one = row(1) // 19840 
    val two = row(2) // A345 
    val dates = row(3) // [2014-12-26, 2013-12-12] 
    val numbers = row(4) // [232323,45466] 
    Row(zro, one, two, dates(0), numbers(0)) 
} 
+0

沒有,我想要第一個和第二個元素。 col3和col4的第一個元素在一行中,col3的第二個元素和第二行中的col4。 樣本輸入:43.45 | 19840 | A345 | [2014-12-26,2013-12-12] | [232323,45466] |輸出:43.45 | 19840 | A345 | 2014-12-26 | 232323 43.45 | 19840 | A345 | 2013-12-12 | 45466 –

0

你可以使用SparkSQL。

  • 首先創建與信息的視圖,我們需要處理:

    df.createOrReplaceTempView("tableTest")

  • 然後你就可以用擴展選擇數據:

    sparkSession.sqlContext.sql(
        "SELECT Amt, id, num, expanded_start_date, expanded_id " + 
        "FROM tableTest " + 
        "LATERAL VIEW explode(Start_date) Start_date AS expanded_start_date " + 
        "LATERAL VIEW explode(Identifier) AS expanded_id") 
    .show()