我有像在火花下面的一個數據幀,並欲由id
列組,然後在分組數據中的每個線我需要在由指定的索引來創建與來自weight
列元素的稀疏矢量index
列。稀疏矢量的長度是已知的,例如對於這個例子來說爲1000。如何使用Scala來聚合Spark數據框以獲得稀疏矢量?
數據幀df
:
+-----+------+-----+
| id|weight|index|
+-----+------+-----+
|11830| 1| 8|
|11113| 1| 3|
| 1081| 1| 3|
| 2654| 1| 3|
|10633| 1| 3|
|11830| 1| 28|
|11351| 1| 12|
| 2737| 1| 26|
|11113| 3| 2|
| 6590| 1| 2|
+-----+------+-----+
我已閱讀this這是有點類似的是我想做的事,但對於一個RDD。有誰知道使用Scala爲Spark中的數據框完成此操作的好方法嗎?
我嘗試到目前爲止是先收集權重和指標如表所示:
val dfWithLists = df
.groupBy("id")
.agg(collect_list("weight") as "weights", collect_list("index") as "indices"))
它看起來像:
+-----+---------+----------+
| id| weights| indices|
+-----+---------+----------+
|11830| [1, 1]| [8, 28]|
|11113| [1, 3]| [3, 2]|
| 1081| [1]| [3]|
| 2654| [1]| [3]|
|10633| [1]| [3]|
|11351| [1]| [12]|
| 2737| [1]| [26]|
| 6590| [1]| [2]|
+-----+---------+----------+
然後我定義一個UDF,做這樣的事情:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf
def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)
val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))
但這似乎不工作,它感覺應該有一個更簡單的方法來做到這一點而不需要首先收集列表的權重和索引。
我很新的火花,Dataframes和Scala,因此任何幫助是高度讚賞。
謝謝!當索引向量按嚴格遞增順序排列時,這是有效的。有沒有辦法做到這一點,如果索引向量沒有排序?我得到這個錯誤:java.lang.IllegalArgumentException異常:要求失敗:指數324以下660,而且不嚴格增加 – joakimj
它現在使用無序對(指標,權重)的順序創建載體和它們的順序應該不再重要。 – Traian