我很新的階(一般我爲此在R)斯卡拉/火花:在數據幀轉換零個膨脹數據LIBSVM
我已導入一個的大數據幀(2000+列,100000+行),其零膨脹。
任務 爲了將數據轉換成LIBSVM格式
步驟 據我所知的步驟如下
- 確保特徵列被設置爲DoubleType和目標是一個Int
- 遍歷每一行,保留每個值> 0在一個數組中,並在另一個數組中保留其列的索引
- 轉換爲RDD [LabeledPoint]
- 保存RDD在LIBSVM格式
我被困在3(但也許),因爲我在做第2步錯了。
這裏是我的代碼:
主要功能:
@Test
def testSpark(): Unit =
{
try
{
var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
assertTrue(true)
}
catch
{
case ex: Exception =>
{
println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
fail()
}
}
}
轉換每一行LabeledPoint:
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
try
{
val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
val rowValuesItr: Iterable[Double] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))
return lp
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
問題 於是我嘗試創建la的數據框beledpoints可以很容易地轉換爲RDD。
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
,但我得到了以下錯誤:
SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
你嘗試提到'進口spark.implicits._'作爲錯誤信息?另外,scala中通常不使用'return',它可能會產生問題。 – Shaido
任務不可序列化和org.apache.spark.SparkException:任務在添加implicits後不可序列化 – Jake
我想我需要查看矩陣 – Jake