首先我會注意到,我無法解釋爲什麼你的explode()
變成Row(employee: Seq[Row])
,因爲我不知道DataFrame的模式。我必須假設它與你的數據結構有關。
不知道你的原始數據,我創建了一個小數據集從
scala> val df = sc.parallelize(Array((1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]
工作,如果我現在在地圖,你可以SE,它返回一個包含任何類型的數據行。
scala> df.map {case row: Row => (row(0), row(1)) }
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33
你已經基本上失去了類型信息,這就是爲什麼你需要明確指定的類型,當你想在該行中使用的數據
scala> df.map {case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) }
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33
因此,爲了要爆炸了,我必須做到以下幾點
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
df.explode(col("id"), col("text")) {case row: Row =>
val id = row(0).asInstanceOf[Int]
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => (id, word))
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string]
scala> res30 show
+---+--------------------+---+-----+
| id| text| _1| _2|
+---+--------------------+---+-----+
| 1|dsfds dsf dasf ds...| 1|dsfds|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dasf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| d|
| 2|2344 2353 24 2343...| 2| 2344|
| 2|2344 2353 24 2343...| 2| 2353|
| 2|2344 2353 24 2343...| 2| 24|
| 2|2344 2353 24 2343...| 2|23432|
| 2|2344 2353 24 2343...| 2| 234|
+---+--------------------+---+-----+
如果你想命名的列,您可以定義的情況下類來保存你的數據爆炸
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
case class ExplodedData(word: String)
df.explode(col("id"), col("text")) {case row: Row =>
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => ExplodedData(word))
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
defined class ExplodedData
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string]
scala> res35.select("id","word").show
+---+-----+
| id| word|
+---+-----+
| 1|dsfds|
| 1| dsf|
| 1| dasf|
| 1| dsf|
| 1| dsf|
| 1| d|
| 2| 2344|
| 2| 2353|
| 2| 24|
| 2|23432|
| 2| 234|
+---+-----+
希望這會帶來一些清晰。