2017-08-12 81 views
1

我有格式計算總和,多個頂部的K值的計數火花

+---------------------------------+ 
|name| values |score |row_number| 
+---------------------------------+ 
|A |1000 |0  |1  | 
|B |947 |0  |2  | 
|C |923 |1  |3  | 
|D |900 |2  |4  | 
|E |850 |3  |5  | 
|F |800 |1  |6  | 
+---------------------------------+ 

我需要獲得總和(值)時評分> 0和ROW_NUMBER < K(I,E)SUM的輸入數據幀當數據框中的前k個值得分> 0時,所有值的總和。

我可以通過運行以下查詢百強值

val top_100_data = df.select(
     count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"), 
     sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"), 
     sum(when(col("row_number") <=100, col(values))).alias("total_sum_100") 
    ) 

不過,我需要爲高檔100,200,300 ...... 2500取數據,以實現這一目標。這意味着我需要運行這個查詢25次,最後結合25個數據幀。

我是新來的火花,還有很多東西。解決這個問題最好的辦法是什麼?

謝謝!

回答

1

您可以創建一個限制爲Array

val topFilters = Array(100, 200, 300) // you can add more 

然後你就可以通過topFilters陣列循環,並在您需要的dataframe我建議你使用join而不是union,因爲join會給你單獨的columnsunions會給你單獨的rows。你可以做以下

鑑於你dataframe作爲

+----+------+-----+----------+ 
|name|values|score|row_number| 
+----+------+-----+----------+ 
|A |1000 |0 |1   | 
|B |947 |0 |2   | 
|C |923 |1 |3   | 
|D |900 |2 |200  | 
|E |850 |3 |150  | 
|F |800 |1 |250  | 
+----+------+-----+----------+ 

您可以通過使用作爲

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 
var finalDF : DataFrame = Seq("1").toDF("rowNum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k")) 
    finalDF = finalDF.join(top_100_data, Seq("rowNum")) 
} 
finalDF.show(false) 

上面定義的topFilters陣列應該給你最後dataframe作爲

+------+-------------+-------------+-------------+ 
|rowNum|total_sum_100|total_sum_200|total_sum_300| 
+------+-------------+-------------+-------------+ 
|1  |923   |1773   |3473   | 
+------+-------------+-------------+-------------+ 

您可以爲你的25個限制做同樣的事情。

如果您打算使用union,那麼這個想法與上述類似。

我希望答案是有幫助的

更新

如果您需要工會,那麼你可以申請以下邏輯上面

var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"), 
    sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"), 
    sum(when(col("row_number") <=k, col("values"))).alias("total_sum")) 
    finalDF = finalDF.union(top_100_data) 
} 
finalDF.filter(col("limit") =!= 0).show(false) 

定義相同限制陣列應該給你

+-----+-----+------------+---------+ 
|limit|count|sum_filtered|total_sum| 
+-----+-----+------------+---------+ 
|100 |1 |923   |2870  | 
|200 |3 |2673  |4620  | 
|300 |4 |3473  |5420  | 
+-----+-----+------------+---------+ 
+0

嗨!謝謝你的回答,這非常有幫助!!所以我會每K需要3列(sum_100_filtered_score,total_sum_100,count_filtered_score_100)。通過加入數據集,我爲每個字段獲得一列。這就是爲什麼我試圖使用工會 – Vignesh

+0

然後去。 :)而不是加入你可以使用聯合。 –

+0

看到我更新的答案:)你可以接受和upvote,如果它真的幫助你 –