爲了獲得滾動percent_rank()
,你就必須能夠使用窗框定義排名funtions你根本無法。 (這樣的事情w = Window.orderBy('t', 'x').rowsBetween(-sys.maxsize, 0)
)
我發現了一個辦法解決它,但它涉及到一個笛卡兒連接,這是非常昂貴:
首先,讓我們創建示例數據框:
import pyspark.sql.functions as psf
from pyspark.sql import HiveContext
hc = HiveContext(sc)
df = hc.createDataFrame(sc.parallelize(zip(range(5), [1,3,5,4,2])), ['t', 'x'])
笛卡兒連接:
df2 = df.groupBy(df.x.alias('x2')).agg(psf.min("t").alias("t2"))
df_cross = df.join(df2).filter("t2 <= t").withColumn("isSup", (df.x > df2.x2).cast("int"))
+---+---+---+---+-----+
| t| x| t2| x2|isSup|
+---+---+---+---+-----+
| 1| 3| 0| 1| 1|
| 2| 5| 0| 1| 1|
| 2| 5| 1| 3| 1|
| 3| 4| 0| 1| 1|
| 3| 4| 1| 3| 1|
| 3| 4| 2| 5| 0|
| 4| 2| 0| 1| 1|
| 4| 2| 1| 3| 0|
| 4| 2| 2| 5| 0|
| 4| 2| 3| 4| 0|
+---+---+---+---+-----+
最後,我們通過 't', 'X' 基團:
df_fin = df_cross.groupBy("t", "x").agg(
psf.count("*").alias("count"),
psf.sum("isSup").alias("rank")
).withColumn('pct_rank_win', psf.col("rank")/psf.greatest(psf.col('count') - 1, psf.lit(1)))
+---+---+-----+----+------------------+
| t| x|count|rank| pct_rank_win|
+---+---+-----+----+------------------+
| 0| 1| 1| 0| 0.0|
| 1| 3| 2| 1| 1.0|
| 2| 5| 3| 2| 1.0|
| 3| 4| 4| 2|0.6666666666666666|
| 4| 2| 5| 1| 0.25|
+---+---+-----+----+------------------+
的groupBy('x')
在df2
定義是爲了確保密集的排名(同一值將具有相同的等級),如用下面的例子:
df = hc.createDataFrame(sc.parallelize(zip(range(6), [1,3,3,5,4,2])), ['t', 'x'])
+---+---+-----+----+------------------+
| t| x|count|rank| pct_rank_win|
+---+---+-----+----+------------------+
| 0| 1| 1| 0| 0.0|
| 1| 3| 2| 1| 1.0|
| 2| 3| 2| 1| 1.0|
| 3| 5| 3| 2| 1.0|
| 4| 4| 4| 2|0.6666666666666666|
| 5| 2| 5| 1| 0.25|
+---+---+-----+----+------------------+