2016-09-25 156 views
0

我對Python很熟悉,我正在學習Spark-Scala。在Spark-Scala中,如何將數組列表複製到DataFrame中?

我想建立具有由這種語法desribed結構的數據幀:

// Prepare training data from a list of (label, features) tuples. 
val training = spark.createDataFrame(Seq(
    (1.1, Vectors.dense(1.1, 0.1)), 
    (0.2, Vectors.dense(1.0, -1.0)), 
    (3.0, Vectors.dense(1.3, 1.0)), 
    (1.0, Vectors.dense(1.2, -0.5)) 
)).toDF("label", "features") 

我從這個網址上面的語法: http://spark.apache.org/docs/latest/ml-pipeline.html

目前我的數據是數組,我已經退出出了DF的:

val my_a = gspc17_df.collect().map{row => Seq(row(2),Vectors.dense(row(3).asInstanceOf[Double],row(4).asInstanceOf[Double]))} 

我的陣列的結構非常類似於上述DF:

my_a: Array[Seq[Any]] = 
Array(
    List(-1.4830674013266898, [-0.004192832940431825,-0.003170667657263393]), 
    List(-0.05876766500768526, [-0.008462913654529357,-0.006880595828929472]), 
    List(1.0109273250546658, [-3.1816797620416693E-4,-0.006502619326182358])) 

如何將數據從我的數組複製到具有上述結構的DataFrame?

我想這句法:

val my_df = spark.createDataFrame(my_a).toDF("label","features") 

星火我吼道:

<console>:105: error: inferred type arguments [Seq[Any]] do not conform to method createDataFrame's type parameter bounds [A <: Product] 
     val my_df = spark.createDataFrame(my_a).toDF("label","features") 
         ^
<console>:105: error: type mismatch; 
found : scala.collection.mutable.WrappedArray[Seq[Any]] 
required: Seq[A] 
     val my_df = spark.createDataFrame(my_a).toDF("label","features") 
             ^
scala> 

回答

4

這裏的第一個問題是,你使用List存儲行數據。列表是同類數據結構,並且由於Anyrow(2))和DenseVector的唯一常見類型是AnyObject),所以最終的結果爲Seq[Any]

下一個問題是你根本用row(2)。由於實際上是Any的一個集合,因此此操作不會返回任何有用的類型,並且不會將結果存儲在DataFrame中,而不會提供明確的Encoder

從更加火花的角度來看,它也不是好方法。 collect - 只是爲了轉換數據,不應該要求任何評論和。映射到Rows只是爲了創建Vectors也沒有多大意義。

假設沒有類型不匹配,你可以使用VectorAssembler

import org.apache.spark.ml.feature.VectorAssembler 

val assembler = new VectorAssembler() 
    .setInputCols(Array(df.columns(3), df.columns(4))) 
    .setOutputCol("features") 

assembler.transform(df).select(df.columns(2), "features") 

,或者如果你真的想手動的UDF處理這個問題。

val toVec = udf((x: Double, y: Double) => Vectors.dense(x, y)) 

df.select(col(df.columns(2)), toVec(col(df.columns(3)), col(df.columns(4)))) 

一般來說,我強烈建議在開始使用Spark之前熟悉Scala。

相關問題