2017-09-05 36 views
0

我有一個如下所示的數據框。查找數據幀中屬性組合的前n個元素spark

scala> ds.show 
    +----+----------+----------+-----+ 
    | key|attribute1|attribute2|value| 
    +----+----------+----------+-----+ 
    |mac1|  A1|  B1| 10| 
    |mac2|  A2|  B1| 10| 
    |mac3|  A2|  B1| 10| 
    |mac1|  A1|  B2| 10| 
    |mac1|  A1|  B2| 10| 
    |mac3|  A1|  B1| 10| 
    |mac2|  A2|  B1| 10| 
    +----+----------+----------+-----+ 

對於屬性1中的每個值,我想找到該鍵的前N個鍵和聚合值。 輸出:用於ATTRIBUTE1鍵 聚合值將是

+----+----------+-----+ 
    | key|attribute1|value| 
    +----+----------+-----+ 
    |mac1|  A1| 30| 
    |mac2|  A2| 20| 
    |mac3|  A2| 10| 
    |mac3|  A1| 10| 
    +----+----------+-----+ 

現在,如果N = 1,那麼輸出將是A1 - (mac1,30)A2-(mac2,20)

如何實現這在DataFrame/Dataset中? 我想爲所有屬性實現此目的。在上面的例子中,我也想找到屬性1和屬性2。

回答

1

鑑於輸入dataframe作爲

+----+----------+----------+-----+ 
|key |attribute1|attribute2|value| 
+----+----------+----------+-----+ 
|mac1|A1  |B1  |10 | 
|mac2|A2  |B1  |10 | 
|mac3|A2  |B1  |10 | 
|mac1|A1  |B2  |10 | 
|mac1|A1  |B2  |10 | 
|mac3|A1  |B1  |10 | 
|mac2|A2  |B1  |10 | 
+----+----------+----------+-----+ 

,並在上面輸入dataframeaggregation作爲

import org.apache.spark.sql.functions._ 
val groupeddf = df.groupBy("key", "attribute1").agg(sum("value").as("value")) 

應該給你

+----+----------+-----+ 
|key |attribute1|value| 
+----+----------+-----+ 
|mac1|A1  |30.0 | 
|mac3|A1  |10.0 | 
|mac3|A2  |10.0 | 
|mac2|A2  |20.0 | 
+----+----------+-----+ 

現在你可以使用Window函數生成等級f或每行中的分組數據和filter行與rank <= N

val N = 1 

val windowSpec = Window.partitionBy("attribute1").orderBy($"value".desc) 

groupeddf.withColumn("rank", rank().over(windowSpec)) 
    .filter($"rank" <= N) 
    .drop("rank") 

這應該給你你想要的dataframe

+----+----------+-----+ 
|key |attribute1|value| 
+----+----------+-----+ 
|mac2|A2  |20.0 | 
|mac1|A1  |30.0 | 
+----+----------+-----+