2017-01-28 54 views
0

Dataframe更頻繁值:如何計算與考慮火花

+---+---+ 
| c1| c2| 
+---+---+ 
| A| 1| 
| A| 2| 
| A| 1| 
| B| 3| 
| B| 4| 
| B| 4| 
+---+---+ 

我要計算每個值C1,C2

+---+---+ 
| c1| c2| 
+---+---+ 
| A| 1| 
| B| 4| 
+---+---+ 

這裏的更頻繁的價值是我當前的代碼(Spark 1.6.0)

val df = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 4))).toDF("c1", "c2") 
df.groupBy("c1", "c2") 
    .count() 
    .groupBy("c1") 
    .agg(max(struct(col("count"), col("c2"))).as("max")) 
    .select("c1", "max.c2") 

有沒有更好的方法?

回答

1

如果您對使用Spark SQL感到滿意,下面的實現將會起作用。 請注意,Spark 1.4中的窗口函數可從Spark 1.4開始使用。

df.registerTempTable("temp_table") 

sqlContext.sql 
(""" 
SELECT c1,c2 FROM 
(SELECT c1,c2, RANK() OVER(PARTITION BY c1 ORDER BY cnt DESC) as rank FROM (
SELECT c1,c2,count(*) as cnt FROM temp_table GROUP BY c1,c2) t0) t1 
WHERE t1.rank = 1 
""").show() 
0
val df = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 4))).toDF("c1", "c2") 

進口org.apache.spark.sql.expressions.Window

VAL overCategory = Window.partitionBy($ 「C1」,$ 「C2」)。ORDERBY($ 「C2」 .desc)

VAL COUNT天= df.withColumn( 「計數」,計數($ 「C2」)。在(overCategory))。dropDuplicates

VAL freqCategory = countd.withColumn( 「MAX」,MAX( $ 「計數」)。在(Window.partitionBy($ 「C1」)))。過濾器($ 「計數」 === $ 「MAX」)。降(「湊nt「,」max「)