2017-04-14 24 views
0

我想獲取數據集中的前n項。Spark窗口函數top N項性能問題

最初我做到了。

var df = Seq((1 , "row1") , (2,"row2"), (1,"row11") , (1 , null)).toDF() 

df=df.select($"_1".alias("p_int"), $"_2".alias("p_string")) 

val resultDf =df.where($"p_string".isNotNull).select($"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "ROWNUMBER").where($"ROWNUMBER" <= 2) 

但我想避免的操作 「在哪裏($」 ROWNUMBER 「< = 10)」 的性能成本

所以我決定做以下

var df = Seq((1 , "row1") , (2,"row2"), (1,"row11") , (1 , null)).toDF() 

df=df.select($"_1".alias("p_int"), $"_2".alias("p_string")) 

val test =df.where($"p_string".isNotNull).select($"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "ROWNUMBER") 

implicit val encoder = RowEncoder(test.schema) 

var temp =test.mapPartitions(_.take(2)) 

然而,我的測試似乎表明,這不會產生正確的輸出。

任何想法爲什麼。從窗口數據集獲得的迭代器上的take函數不會獲得迭代器中的前n個元素嗎?

回答

0

Dataset的分區與PARTITION BY子句沒有一一對應關係。 OVER (PARTITION BY ...)中的所有魔法都發生在低得多的水平上,並且單個物理分區將處理多個ID。

此外,你並沒有真正保存工作。要正確生成row_numbers Spark必須對所有數據進行整理,排序和掃描。您需要更低級別的機制來避免完全洗牌和排序(例如Aggregator帶有二進制堆)。