2017-05-18 111 views
1

我有一個火花數據框df下面的模式:星火據幀到數據幀[矢量]

root 
|-- features: array (nullable = true) 
| |-- element: double (containsNull = false) 

我想創建一個新的數據幀,每一行會的Double個向量並期望得到以下模式:

root 
    |-- features: vector (nullable = true) 

到目前爲止,我有以下的代碼(由這篇文章的影響:Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala),但我擔心的東西是錯誤與我因爲計算合理數量的行需要很長時間。另外,如果行數太多,應用程序將崩潰併產生堆空間異常。

val clustSet = df.rdd.map(r => { 
      val arr = r.getAs[mutable.WrappedArray[Double]]("features") 
      val features: Vector = Vectors.dense(arr.toArray) 
      features 
      }).map(Tuple1(_)).toDF() 

我懷疑在這種情況下,指令arr.toArray不是一個好的Spark練習。任何澄清都會非常有幫助。

謝謝!

回答

4

這是因爲.rdd有反序列化從內部內存格式的對象,這是非常耗時。

這是確定使用.toArray - 正在以行級,不收集一切駕駛員節點。

你可以做到這一點很容易的UDF:

import org.apache.spark.ml.linalg._ 
val convertUDF = udf((array : Seq[Double]) => { 
    Vectors.dense(array.toArray) 
}) 
val withVector = dataset 
    .withColumn("features", convertUDF('features)) 

代碼是從這樣的回答:Convert ArrayType(FloatType,false) to VectorUTD

然而,有問題的作者並沒有問差異

+0

非常感謝你,這對我有很大幫助,並將其標記爲答案。我現在可以運行更多的行,並且它在時間上是令人滿意的。我仍然得到一個異常:__org.apache.spark.SparkException:Kryo序列化失敗:緩衝區溢出。可用:0,必需:1__當我嘗試200,000行時。你會對此有所瞭解嗎?再次感謝。 – user159941

+0

@ user159941請檢查http://stackoverflow.com/questions/31947335/how-kryo-serializer-allocates-buffer-in-spark –

+1

我在我的代碼設置如下:** VAL的conf =新SparkConf() 。設置(「spark.serializer」,「org.apache.spark.serializer.KryoSerializer」) .set(「spark.kryoserializer.buffer.max.mb」,「256」)**,它的工作!謝謝。 – user159941