這是你要找的東西嗎?
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sparkSession = ...
import sparkSession.implicits._
val input = sc.parallelize(Seq(
(43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)),
(43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454))
)).toDF("amt", "id", "num", "start_date", "identifier")
val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) =>
dates.zip(identifiers)
}
val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier")))
.select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier"))
output.show()
將返回:
+-----+-----+----+----------+----------+
| amt| id| num|start_date|identifier|
+-----+-----+----+----------+----------+
|43.45|19840|A345|2014-12-26| 232323|
|43.45|19840|A345|2013-12-12| 45466|
|43.45|19840|A345|2010-03-16| 34343|
|43.45|19840|A345|2013-16-12| 45454|
+-----+-----+----+----------+----------+
編輯:
既然你想有應拉上多列,你應該嘗試這樣的事:
val input = sc.parallelize(Seq(
(43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")),
(43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456"))
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column")
val zipArrays = udf { seqs: Seq[Seq[String]] =>
for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i))
}
val columnsToSelect = Seq($"amt", $"id", $"num")
val columnsToZip = Seq($"start_date", $"identifier", $"another_column")
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) =>
$"col".getItem(index).as(column.toString())
}
val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*)
output.show()
/*
+-----+-----+----+----------+----------+--------------+
| amt| id| num|start_date|identifier|another_column|
+-----+-----+----+----------+----------+--------------+
|43.45|19840|A345|2014-12-26| 232323| 123|
|43.45|19840|A345|2013-12-12| 45466| 234|
|43.45|19840|A345|2010-03-16| 34343| 345|
|43.45|19840|A345|2013-16-12| 45454| 456|
+-----+-----+----+----------+----------+--------------+
*/
是的,這正是我所看到的。但我認爲這隻有當我有兩列(start_date&標識符)時纔有效。是否有可能使其成爲動態的,以便它可以用於任意數量的列? –
這是可能的,但我想到的解決方案需要所有這些壓縮列具有相同的類型(例如'string',所以'identifier'列中的所有值都必須是字符串)。你會接受嗎? –
是的。如果你分享示例代碼,那將是非常好的。謝謝。 –