2015-11-23 134 views
61

我已經產生了數據幀如下:如何選擇每組的第一行?

df.groupBy($"Hour", $"Category") 
    .agg(sum($"value") as "TotalValue") 
    .sort($"Hour".asc, $"TotalValue".desc)) 

結果如下所示:

+----+--------+----------+ 
|Hour|Category|TotalValue| 
+----+--------+----------+ 
| 0| cat26|  30.9| 
| 0| cat13|  22.1| 
| 0| cat95|  19.6| 
| 0| cat105|  1.3| 
| 1| cat67|  28.5| 
| 1| cat4|  26.8| 
| 1| cat13|  12.6| 
| 1| cat23|  5.3| 
| 2| cat56|  39.6| 
| 2| cat40|  29.7| 
| 2| cat187|  27.9| 
| 2| cat68|  9.8| 
| 3| cat8|  35.6| 
| ...| ....|  ....| 
+----+--------+----------+ 

正如你所看到的,數據幀由Hour在遞增順序在排序,然後按TotalValue降序。

我想選擇各組的最上面一行,即

  • 由選自小時的組小時== 0選擇(0,cat26,30.9)
  • 的== 1選自小時== 2選擇(2,cat56,39.6)的選擇(1,cat67,28.5)

因此所需的輸出將是:

+----+--------+----------+ 
|Hour|Category|TotalValue| 
+----+--------+----------+ 
| 0| cat26|  30.9| 
| 1| cat67|  28.5| 
| 2| cat56|  39.6| 
| 3| cat8|  35.6| 
| ...|  ...|  ...| 
+----+--------+----------+ 

也可以很方便地選擇每個組的前N行。

任何幫助,高度讚賞。

回答

107

窗口功能

像這樣的東西應該做的伎倆:

import org.apache.spark.sql.functions.{row_number, max, broadcast} 
import org.apache.spark.sql.expressions.Window 

val df = sc.parallelize(Seq(
    (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), 
    (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), 
    (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), 
    (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue") 

val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc) 

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") 

dfTop.show 
// +----+--------+----------+ 
// |Hour|Category|TotalValue| 
// +----+--------+----------+ 
// | 0| cat26|  30.9| 
// | 1| cat67|  28.5| 
// | 2| cat56|  39.6| 
// | 3| cat8|  35.6| 
// +----+--------+----------+ 

這種方法將在顯著數據不對稱的情況下,效率不高。

平原SQL聚集其次join

val dfMax = df.groupBy($"hour").agg(max($"TotalValue")) 

val dfTopByJoin = df.join(broadcast(dfMax), 
    ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value")) 
    .drop("max_hour") 
    .drop("max_value") 

dfTopByJoin.show 

// +----+--------+----------+ 
// |Hour|Category|TotalValue| 
// +----+--------+----------+ 
// | 0| cat26|  30.9| 
// | 1| cat67|  28.5| 
// | 2| cat56|  39.6| 
// | 3| cat8|  35.6| 
// +----+--------+----------+ 

它將會繼續重複值(如果有超過每小時一類具有:

或者你可以用聚合數據幀加盟總值相同)。您可以按以下方式刪除這些:

dfTopByJoin 
    .groupBy($"hour") 
    .agg(
    first("category").alias("category"), 
    first("TotalValue").alias("TotalValue")) 

使用排序超過structs

整潔,雖然不是很好的測試,不需要招加入或窗口功能:

val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) 
    .groupBy($"hour") 
    .agg(max("vs").alias("vs")) 
    .select($"Hour", $"vs.Category", $"vs.TotalValue") 

dfTop.show 
// +----+--------+----------+ 
// |Hour|Category|TotalValue| 
// +----+--------+----------+ 
// | 0| cat26|  30.9| 
// | 1| cat67|  28.5| 
// | 2| cat56|  39.6| 
// | 3| cat8|  35.6| 
// +----+--------+----------+ 

With DataSet API(Spark 1.6+,2.0+):

火花1.6

case class Record(Hour: Integer, Category: String, TotalValue: Double) 

df.as[Record] 
    .groupBy($"hour") 
    .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) 
    .show 

// +---+--------------+ 
// | _1|   _2| 
// +---+--------------+ 
// |[0]|[0,cat26,30.9]| 
// |[1]|[1,cat67,28.5]| 
// |[2]|[2,cat56,39.6]| 
// |[3]| [3,cat8,35.6]| 
// +---+--------------+ 

火花2。0或更高版本

df.as[Record] 
    .groupByKey(_.Hour) 
    .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y) 

最後兩個方法可以利用地圖側相結合,不需要全面洗牌所以大部分的時間應該比窗口功能,並加入表現出更好的性能。

不要使用

df.orderBy(...).groupBy(...).agg(first(...), ...) 

這似乎是工作(尤其是在local模式),但它是不可靠的(SPARK-16207)。點數爲Tzach Zoharlinking relevant JIRA issue

同樣的說明適用於

df.orderBy(...).dropDuplicates(...) 

內部使用等效的執行計劃。

+1

它看起來像火花,因爲它1.6爲[ROW_NUMBER()](https://spark.apache.org/docs/latest/api/scala/index .html#org.apache.spark.sql.functions $ @ row_number():org.apache.spark.sql.Column)而不是rowNumber –

+0

關於不要使用df.orderBy(...)。gropBy(。 ..)。在什麼情況下我們可以依靠orderBy(...)?或者如果我們不能確定orderBy()是否會給出正確的結果,我們還有什麼替代方法? –

+0

我可能忽略了一些東西,但總的來說建議[避免groupByKey](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html),而不是reduceByKey應該是用過的。另外,您將保存一行。 – Thomas

0

如果數據幀必須由多列進行分組,這樣可以幫助

val keys = List("Hour", "Category"); 
val selectFirstValueOfNoneGroupedColumns = 
df.columns 
    .filterNot(keys.toSet) 
    .map(_ -> "first").toMap 
val grouped = 
df.groupBy(keys.head, keys.tail: _*) 
    .agg(selectFirstValueOfNoneGroupedColumns) 

希望這有助於有人用類似的問題

+1

這是不正確的,因爲與[這裏]所述相同的原因(https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group#comment78445228_45602100)。 – zero323

3

火花2.0.2與多列分組:

import org.apache.spark.sql.functions.row_number 
import org.apache.spark.sql.expressions.Window 

val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc) 

val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") 
-3

我們可以使用rank()窗口函數(您將選擇rank = 1) rank只是爲組的每一行添加一個數字(在此情況下它將是小時)

這裏是一個例子。 (從https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank

val dataset = spark.range(9).withColumn("bucket", 'id % 3) 

import org.apache.spark.sql.expressions.Window 
val byBucket = Window.partitionBy('bucket).orderBy('id) 

scala> dataset.withColumn("rank", rank over byBucket).show 
+---+------+----+ 
| id|bucket|rank| 
+---+------+----+ 
| 0|  0| 1| 
| 3|  0| 2| 
| 6|  0| 3| 
| 1|  1| 1| 
| 4|  1| 2| 
| 7|  1| 3| 
| 2|  2| 1| 
| 5|  2| 2| 
| 8|  2| 3| 
+---+------+----+ 
-1

對於火花> 2.0我們可以簡單地做:
groupBy($"Hour").agg(df_op.columns.map((_, "first")).toMap)

使用OP的詳細設置:

val df_op = df.groupBy($"Hour", $"Category") 
    .agg(sum($"value") as "TotalValue") 
    .sort($"Hour".asc, $"TotalValue".desc)) 

df_op.groupBy($"Hour").agg(df_op.columns.map((_, "first")).toMap) 

這是使用的RelationalGroupedDatasetagg方法到Compute aggregates by specifying a map from column name to aggregate methods.first是一個sql聚合函數。

+4

'groupBy'不一定保存順序,因此'first'可能不會根據預先完成的排序返回第一個項目,請參閱https://issues.apache.org/jira/browse/SPARK-16207 –

-2

在這裏,你可以這樣做 -

val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour") 

data.withColumnRenamed("_1","Hour").show