這對我有效。
// Create a vector rdd
val vectorRDD = sc.parallelize(Seq(Seq(123L, 345L), Seq(567L, 789L), Seq(567L, 789L, 233334L))).
map(s => Vectors.dense(s.toSeq.map(_.toString.toDouble).toArray))
// Calculate the maximum length of the vector to create a schema
val vectorLength = vectorRDD.map(x => x.toArray.length).max()
// create the dynamic schema
var schema = new StructType()
var i = 0
while (i < vectorLength) {
schema = schema.add(StructField(s"val${i}", DoubleType, true))
i = i + 1
}
// create a rowRDD variable and make each row have the same arity
val rowRDD = vectorRDD.map { x =>
var row = new Array[Double](vectorLength)
val newRow = x.toArray
System.arraycopy(newRow, 0, row, 0, newRow.length);
println(row.length)
Row.fromSeq(row)
}
// create your dataframe
val dataFrame = sqlContext.createDataFrame(rowRDD, schema)
輸出:
root
|-- val0: double (nullable = true)
|-- val1: double (nullable = true)
|-- val2: double (nullable = true)
+-----+-----+--------+
| val0| val1| val2|
+-----+-----+--------+
|123.0|345.0| 0.0|
|567.0|789.0| 0.0|
|567.0|789.0|233334.0|
+-----+-----+--------+
據我瞭解,我回答類似的東西在這裏:https://stackoverflow.com/a/45009516/7224597 你可以檢查是否適合你? – philantrovert