2017-08-01 107 views
0

通過scala/spark 1.6將RDD [Vector]轉換爲DataFrame的最佳解決方案是什麼? 輸入是不同的RDD [矢量]。 對於不同的RDD,Vector中的列號可以從1到n。Spark - 使用可變列將RDD [Vector]轉換爲DataFrame

我試過使用無形庫,蝙蝠他們需要聲明的列號和類型。 ES:

val df = rddVector.map(_.toArray.toList) 
    .collect { 
      case t: List[Double] if t.length == 3 => t.toHList[Double :: Double :: Double :: HNil].get.tupled.productArity 
    } 
    .toDF("column_1", "column_2", "column_3") 

謝謝!

+0

據我瞭解,我回答類似的東西在這裏:https://stackoverflow.com/a/45009516/7224597 你可以檢查是否適合你? – philantrovert

回答

2

這對我有效。

// Create a vector rdd 
    val vectorRDD = sc.parallelize(Seq(Seq(123L, 345L), Seq(567L, 789L), Seq(567L, 789L, 233334L))). 
    map(s => Vectors.dense(s.toSeq.map(_.toString.toDouble).toArray)) 

    // Calculate the maximum length of the vector to create a schema 
    val vectorLength = vectorRDD.map(x => x.toArray.length).max() 

    // create the dynamic schema 
    var schema = new StructType() 
    var i = 0 
    while (i < vectorLength) { 
    schema = schema.add(StructField(s"val${i}", DoubleType, true)) 
    i = i + 1 
    } 

    // create a rowRDD variable and make each row have the same arity 
    val rowRDD = vectorRDD.map { x => 
    var row = new Array[Double](vectorLength) 
    val newRow = x.toArray 

    System.arraycopy(newRow, 0, row, 0, newRow.length); 

    println(row.length) 

    Row.fromSeq(row) 
    } 

    // create your dataframe 
    val dataFrame = sqlContext.createDataFrame(rowRDD, schema) 

輸出:

root 
|-- val0: double (nullable = true) 
|-- val1: double (nullable = true) 
|-- val2: double (nullable = true) 

+-----+-----+--------+ 
| val0| val1| val2| 
+-----+-----+--------+ 
|123.0|345.0|  0.0| 
|567.0|789.0|  0.0| 
|567.0|789.0|233334.0| 
+-----+-----+--------+ 
+0

謝謝,在此解決方案中您必須創建一個固定的模式。 我不知道架構。該模式是可變的。我的Spark版本是1.6,不是2.0。 –

+1

我已更新答案以適應您提供的條件。它不是最好的解決方案,但會工作:) –

相關問題