1

我需要一個RDD轉換爲單個柱o.a.s.ml.linalg.Vector數據幀中,爲了使用ML算法,特別KMEANS對於這種情況。這是我的RDD:(陣列/ ML矢量/ MLlib矢量)RDD到ML矢量數據幀coulmn

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.mllib.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble)))) 

我試圖做什麼this答案沒有運氣所暗示的,我想是因爲你結束了一個mllib載體,其上運行的算法時拋出一個不匹配錯誤。現在,如果我改變了:

import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} 

val schema = new StructType() 
    .add("features", new VectorUDT()) 

這樣:

import org.apache.spark.ml.linalg.{Vectors, VectorUDT} 

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.ml.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble)))) 

val schema = new StructType() 
    .add("features", new VectorUDT()) 

因爲ML VectorUDT是私人的,我會得到一個錯誤。

我ALGO試圖轉換RDD爲雙打,以數據幀的數組,並獲得ML密集向量是這樣的:

var parsedData = sc.textFile("/home/pililo/Documents/Mi_Memoria/Codigo/Datasets/Digits/digits480x.csv").map(s => Row(s.split(',').slice(0,64).map(_.toDouble))) 

parsedData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 

val schema2 = new StructType().add("features", ArrayType(DoubleType)) 

schema2: org.apache.spark.sql.types.StructType = StructType(StructField(features,ArrayType(DoubleType,true),true)) 

val df = spark.createDataFrame(parsedData, schema2) 

df: org.apache.spark.sql.DataFrame = [features: array<double>] 

val df2 = df.map{ case Row(features: Array[Double]) => Row(org.apache.spark.ml.linalg.Vectors.dense(features)) } 

會拋出下面的錯誤,即使spark.implicits._輸入:

error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 

任何幫助,非常感謝,謝謝!

回答

1

從我頭頂的:

  1. 使用csv源和VectorAssembler

    ​​
  2. 使用text源和UDF:

    def parse_(n: Int, m: Int)(s: String) = Try(
        Vectors.dense(s.split(',').slice(n, m).map(_.toDouble)) 
    ).toOption 
    
    def parse(n: Int, m: Int) = udf(parse_(n, m) _) 
    
    val raw = spark.read.text(path) 
    
    raw.select(parse(n, m)(col(raw.columns.head)).alias("features")) 
    
  3. 使用text源並放下包裝

    spark.read.text(path).as[String].map(parse_(n, m)).toDF 
    
+0

哇感謝您的回答,我給他們一個嘗試。任何想法,如果1)將是最有效的?其實我是做這樣的事情,但缺少切片輸入的cols列的方式,因爲他們是64個。另外我會很感激,如果你可以,如果你能解釋一下這部分1):exprs:_ *。是否像選擇所有列?非常感謝! – Pilailou

+0

2和3可稍快,因爲沒有涉及到CSV解析,但我不會注重這一點。 1.可以通過向讀者提供架構來改進。最後'_ *'是關於[varargs](http://stackoverflow.com/q/1008783/1560062)。它接受一個序列並將其「解壓」爲select的參數。 – zero323

+0

好我會再看看,再次感謝! – Pilailou