2017-07-24 105 views
2

我有像在火花下面的一個數據幀,並欲由id列組,然後在分組數據中的每個線我需要在由指定的索引來創建與來自weight列元素的稀疏矢量index列。稀疏矢量的長度是已知的,例如對於這個例子來說爲1000。如何使用Scala來聚合Spark數據框以獲得稀疏矢量?

數據幀df

+-----+------+-----+ 
| id|weight|index| 
+-----+------+-----+ 
|11830|  1| 8| 
|11113|  1| 3| 
| 1081|  1| 3| 
| 2654|  1| 3| 
|10633|  1| 3| 
|11830|  1| 28| 
|11351|  1| 12| 
| 2737|  1| 26| 
|11113|  3| 2| 
| 6590|  1| 2| 
+-----+------+-----+ 

我已閱讀this這是有點類似的是我想做的事,但對於一個RDD。有誰知道使用Scala爲Spark中的數據框完成此操作的好方法嗎?

我嘗試到目前爲止是先收集權重和指標如表所示:

val dfWithLists = df 
    .groupBy("id") 
    .agg(collect_list("weight") as "weights", collect_list("index") as "indices")) 

它看起來像:

+-----+---------+----------+ 
| id| weights| indices| 
+-----+---------+----------+ 
|11830| [1, 1]| [8, 28]| 
|11113| [1, 3]| [3, 2]| 
| 1081|  [1]|  [3]| 
| 2654|  [1]|  [3]| 
|10633|  [1]|  [3]| 
|11351|  [1]|  [12]| 
| 2737|  [1]|  [26]| 
| 6590|  [1]|  [2]| 
+-----+---------+----------+ 

然後我定義一個UDF,做這樣的事情:

import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.sql.functions.udf 

def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))} 
val udfToSparseVector = udf(toSparseVector) 

val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights")) 

但這似乎不工作,它感覺應該有一個更簡單的方法來做到這一點而不需要首先收集列表的權重和索引。

我很新的火花,Dataframes和Scala,因此任何幫助是高度讚賞。

回答

3

你一定要收集他們作爲載體必須是本地的,單機:https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector

爲了創建稀疏矢量你有2個選擇,使用無序(指數值)對或指定索引和值數組: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors$

如果你能得到的數據轉換成不同的格式(迴轉),你也可以利用VectorAssembler的: https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

有了一些小的調整,你可以得到你的方法工作:

:paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.mllib.regression.LabeledPoint 

val df = Seq((11830,1,8), (11113, 1, 3), (1081, 1,3), (2654, 1, 3), (10633, 1, 3), (11830, 1, 28), (11351, 1, 12), (2737, 1, 26), (11113, 3, 2), (6590, 1, 2)).toDF("id", "weight", "index") 

val dfWithFeat = df 
    .rdd 
    .map(r => (r.getInt(0), (r.getInt(2), r.getInt(1).toDouble))) 
    .groupByKey() 
    .map(r => LabeledPoint(r._1, Vectors.sparse(1000, r._2.toSeq))) 
    .toDS 

dfWithFeat.printSchema 
dfWithFeat.show(10, false) 


// Exiting paste mode, now interpreting. 

root 
|-- label: double (nullable = true) 
|-- features: vector (nullable = true) 

+-------+-----------------------+ 
|label |features    | 
+-------+-----------------------+ 
|11113.0|(1000,[2,3],[3.0,1.0]) | 
|2737.0 |(1000,[26],[1.0])  | 
|10633.0|(1000,[3],[1.0])  | 
|1081.0 |(1000,[3],[1.0])  | 
|6590.0 |(1000,[2],[1.0])  | 
|11830.0|(1000,[8,28],[1.0,1.0])| 
|2654.0 |(1000,[3],[1.0])  | 
|11351.0|(1000,[12],[1.0])  | 
+-------+-----------------------+ 

dfWithFeat: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector] 
+0

謝謝!當索引向量按嚴格遞增順序排列時,這是有效的。有沒有辦法做到這一點,如果索引向量沒有排序?我得到這個錯誤:java.lang.IllegalArgumentException異常:要求失敗:指數324以下660,而且不嚴格增加 – joakimj

+0

它現在使用無序對(指標,權重)的順序創建載體和它們的順序應該不再重要。 – Traian

相關問題