2017-10-18 121 views
1

我行和列的矩陣和數量是不明如何使用Scala/spark將矩陣轉換爲DataFrame?

一個例子Matrix是:

[5,1.3] 
[1,5.2] 

我想將它轉換成數據幀,列名是隨機的,怎麼才達到的呢? 這是我期待的結果:

+-------------+----+ 
    |   _1 | _2 | 
    +-------------+----+ 
    |5   |1.3 | 
    |1   |5.2 | 
    -------------------- 

回答

1

我建議你轉換矩陣RDD,然後再轉換到RDD數據幀,這不是一個好辦法,但在星火2.0.0正常工作。

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.{Row, SparkSession} 
import org.apache.spark.mllib.linalg._ 
import org.apache.spark.rdd.RDD 
object mat2df { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf().setAppName("mat2df").setMaster("local[1]") 
     val sc = new SparkContext(conf) 
     val values = Array(5, 1, 1.3, 5.2) 
     val mat = Matrices.dense(2, 2, values).asInstanceOf[DenseMatrix] 
     def toRDD(m: Matrix): RDD[Vector] = { 
      val columns = m.toArray.grouped(m.numRows) 
      val rows = columns.toSeq.transpose 
      val vectors = rows.map(row => new DenseVector(row.toArray)) 
      sc.parallelize(vectors) 
     } 
     val mat_rows = toRDD(mat)// matrix to rdd 
     val mat_rdd = mat_rows.map(_.toArray).map{case Array(p0, p1) => (p0, p1)} 
     val spark: SparkSession = SparkSession.builder.master("local").getOrCreate 
     val df = spark.createDataFrame(mat_rdd) // rdd to dataframe 
     df.show() 
    } 
} 
1
def matrixToDataFrame(sc:SparkContext, matrix:Matrix, m_nodeColName:String):DataFrame={ 
val rdd = sc.parallelize(matrix.colIter.toSeq).map(x => { 
     Row.fromSeq(x.toArray.toSeq) 
    }) 
    val sc = new SQLContext(nodeContext.getSparkCtx()) 
    var schema = new StructType() 

    val ids = ArrayBuffer[String]() 
    for (i <- 0 until matrix.rowIter.size) { 
     schema = schema.add(StructField(m_nodeColName +"_"+ i.toString(), DoubleType, true)) 
     ids.append(m_nodeColName +"_"+ i.toString()) 
    } 

    sc.sparkSession.createDataFrame(rdd, schema) 
}