2017-07-06 59 views
0

我有一個數據框,其中有子類別,並且需要這些子類別中的每一個的最後一個元素。獲取火花2.1.1中的窗口的最後一個元素

val windowSpec = Window.partitionBy("name").orderBy("count") 
sqlContext 
    .createDataFrame(
     Seq[(String, Int)](
     ("A", 1), 
     ("A", 2), 
     ("A", 3), 
     ("B", 10), 
     ("B", 20), 
     ("B", 30) 
    )) 
    .toDF("name", "count") 
    .withColumn("firstCountOfName", first("count").over(windowSpec)) 
    .withColumn("lastCountOfName", last("count").over(windowSpec)) 
    .show() 

返回我有些奇怪:

+----+-----+----------------+---------------+         
|name|count|firstCountOfName|lastCountOfName| 
+----+-----+----------------+---------------+ 
| B| 10|    10|    10| 
| B| 20|    10|    20| 
| B| 30|    10|    30| 
| A| 1|    1|    1| 
| A| 2|    1|    2| 
| A| 3|    1|    3| 
+----+-----+----------------+---------------+ 

我們可以看到,返回的first值計算正確,但last不是,它總是列的當前值。

有人有辦法做我想做的事嗎?

+0

不是 「排序依據」 +「第一/最後「與」最小「/」最大「相同的列 –

+0

可以;但是我仍然與max有相同的行爲。 – Molochdaa

+0

區別在於,使用'max'不需要orderBy,那麼當您僅使用partitionBy定義windowSpec時,它就可以工作。 –

回答

3

根據問題SPARK-20969,您應該能夠通過爲窗口定義足夠的界限來獲得預期的結果,如下所示。

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 

val windowSpec = Window 
    .partitionBy("name") 
    .orderBy("count") 
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) 

sqlContext 
    .createDataFrame(
    Seq[(String, Int)](
     ("A", 1), 
     ("A", 2), 
     ("A", 3), 
     ("B", 10), 
     ("B", 20), 
     ("B", 30) 
    )) 
    .toDF("name", "count") 
    .withColumn("firstCountOfName", first("count").over(windowSpec)) 
    .withColumn("lastCountOfName", last("count").over(windowSpec)) 
    .show() 

另外,如果你是在你計算第一個和最後的同一列排序,你可以爲minmax與非有序的窗口中更改,那麼它也應該正常工作。

+0

這很完美,謝謝! – Molochdaa

+0

偉大值得讚賞:) –

0

另一種方式做的是使用GROUPBY廣告加入計算第一和最後一個值

val data = spark 
    .createDataFrame(
    Seq[(String, Int)](
     ("A", 1), 
     ("A", 2), 
     ("A", 3), 
     ("B", 10), 
     ("B", 20), 
     ("B", 30) 
    )) 
    .toDF("name", "count") 


val firstLast = data.groupBy("name").agg(first("count").as("firstCountOfName"), last("count").as("lastCountOfName")) 

val result = data.join(firstLast, Seq("name"), "left") 

result.show() 

輸出:

+----+-----+----------------+---------------+ 
|name|count|firstCountOfName|lastCountOfName| 
+----+-----+----------------+---------------+ 
| A| 1|    1|    3| 
| A| 2|    1|    3| 
| A| 3|    1|    3| 
| B| 10|    10|    30| 
| B| 20|    10|    30| 
| B| 30|    10|    30| 
+----+-----+----------------+---------------+ 

希望這有助於