2017-09-01 123 views
0

我很新的階(一般我爲此在R)斯卡拉/火花:在數據幀轉換零個膨脹數據LIBSVM

我已導入一個的大數據幀(2000+列,100000+行),其零膨脹。

任務 爲了將數據轉換成LIBSVM格式

步驟 據我所知的步驟如下

  1. 確保特徵列被設置爲DoubleType和目標是一個Int
  2. 遍歷每一行,保留每個值> 0在一個數組中,並在另一個數組中保留其列的索引
  3. 轉換爲RDD [LabeledPoint]
  4. 保存RDD在LIBSVM格式

我被困在3(但也許),因爲我在做第2步錯了。

這裏是我的代碼:

主要功能:

@Test 
def testSpark(): Unit = 
{ 
try 
{ 

    var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv") 


    val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType) 

    val indexer = new StringIndexer() 
    .setInputCol("Majors_Final") 
    .setOutputCol("Majors_Final_Indexed") 
    val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped) 
    val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType) 



    //only doubles accepted by sparse vector, so that's what we filter for 
    val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType) 

    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) 


    val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF() 


    assertTrue(true) 
} 
catch 
{ 
    case ex: Exception => 
    { 

    println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}") 
    fail() 
    } 
    } 
} 

轉換每一行LabeledPoint:

@throws(classOf[Exception]) 
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint = 
{ 
    try 
    { 
    val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq) 

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*) 
    val rowValuesItr: Iterable[Double] = sortedValuesMap.values 

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]() 
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]() 
    var currentPosition: Int = 0 
    rowValuesItr.foreach 
    { 
     kv => 
     if (kv > 0) 
     { 
      valuesArray += kv; 
      positionsArray += currentPosition; 
     } 
     currentPosition = currentPosition + 1; 
    } 

    val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray)) 

    return lp 

    } 
    catch 
    { 
    case ex: Exception => 
    { 
     throw new Exception(ex) 
    } 
    } 
} 

問題 於是我嘗試創建la的數據框beledpoints可以很容易地轉換爲RDD。

val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF() 

,但我得到了以下錯誤:

SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

+1

你嘗試提到'進口spark.implicits._'作爲錯誤信息?另外,scala中通常不使用'return',它可能會產生問題。 – Shaido

+0

任務不可序列化和org.apache.spark.SparkException:任務在添加implicits後不可序列化 – Jake

+0

我想我需要查看矩陣 – Jake

回答

0

OK,所以我跳過數據幀和創建LabeledPoints數組嘩嘩很容易轉化成RDD。其餘的很容易。

我強調,雖然這可行,但我對scala很陌生,可能有更有效的方法來做到這一點。

主要功能是現在如下:

val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv") 
    val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType) 

    val indexer = new StringIndexer() 
    .setInputCol("Majors_Final") 
    .setOutputCol("Majors_Final_Indexed") 
    val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped) 
    val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType) 

    mDFFinal.show() 
    //only doubles accepted by sparse vector, so that's what we filter for 
    val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType) 
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) 

    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]() 

    mDFFinal.collect().foreach 
    { 

    row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed")); 

    } 

    val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq) 

    MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")