這不是一個能拿出最好的解決辦法,但是從我的頭頂,它會做的工作:
import org.apache.spark.sql.Row
val df = sc.parallelize(List(("user1",1,5),("user2", 0, 13),("user2", 2, 4),("user3", 2, 7),("user3", 3, 45))).toDF("id","ind","freq")
df.show
// +-----+---+----+
// | id|ind|freq|
// +-----+---+----+
// |user1| 1| 5|
// |user2| 0| 13|
// |user2| 2| 4|
// |user3| 2| 7|
// |user3| 3| 45|
// +-----+---+----+
val df2 = df.groupBy('id).pivot("ind").max("freq").na.fill(0)
df2.show
// +-----+---+---+---+---+
// | id| 0| 1| 2| 3|
// +-----+---+---+---+---+
// |user1| 0| 5| 0| 0|
// |user2| 13| 0| 4| 0|
// |user3| 0| 0| 7| 45|
// +-----+---+---+---+---+
val cols = df2.columns
val df3 = df2.rdd.map {
case r : Row =>
val id = r.getAs[String]("id")
cols.map(ind => (id,ind,r.getAs[Integer](ind)))
}.flatMap(_.toSeq).filter(_._2 != "id").toDF("id","ind","freq")
df3.show
// +-----+---+----+
// | id|ind|freq|
// +-----+---+----+
// |user1| 0| 0|
// |user1| 1| 5|
// |user1| 2| 0|
// |user1| 3| 0|
// |user2| 0| 13|
// |user2| 1| 0|
// |user2| 2| 4|
// |user2| 3| 0|
// |user3| 0| 0|
// |user3| 1| 0|
// |user3| 2| 7|
// |user3| 3| 45|
// +-----+---+----+
我使用的pivot
功能從GroupeData
然後我按列平整它。 (Spark 1.6+)
PS:此解決方案沒有優化,我有很多缺點。即:大量的指標,計算成本等。
我投票,這是因爲它得到凌亂這樣做與Spark ... :) – eliasah
你解決這個問題拉米? – eliasah
嗨eliasah,我已經徹底改變了我的策略,所以我不再使用它了,但是你的解決方案是正確的,正如你所說的,它可能需要一些優化。謝謝 – Rami