下面的代碼會給你全面的瞭解,以達到你所期待的東西。測試火花(1.6)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
val lst = List(Seq("Hello","Hyd","Hello","Mumbai"),Seq("Hello","Mumbai"),Seq("Hello","Delhi","Hello","Banglore"))
case class Tweets(filtered: Seq[String])
val df = sc.parallelize(lst).map(x=>Tweets(x)).toDF
import org.apache.spark.sql.functions.{explode}
import org.apache.spark.sql.functions.count
df.select(explode($"filtered").as("value")).groupBy("value").agg(count("*").alias("cnt")).orderBy('cnt.desc).show(20,false)
或者,您可以使用窗口功能。
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
val lst = List(Seq("Hello","Hyd","Hello","Mumbai"),Seq("Hello","Mumbai"),Seq("Hello","Delhi","Hello","Banglore"))
case class Tweets(filtered: Seq[String])
val df = sc.parallelize(lst).map(x=>Tweets(x)).toDF
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val w = org.apache.spark.sql.expressions.Window.orderBy('cnt.desc)
df.select(explode($"filtered").as("value")).groupBy("value").agg(count("*").alias("cnt")).withColumn("filteredrank", rank.over(w)).filter(col("filteredrank") <= 20).show()
你爲什麼改變標題?現在沒有意義。 – schoon