2017-08-23 20 views
0

我想計算列的percent_rank,但排名應該是相對於而言,只有的值 - 由時間戳變量t確定。如何計算在過去星火時間值`percent_rank`?

看來F.percent_rank()不接受任何參數,並獲得獨立的時間排名percent_rank().over(Window.orderBy("x"))?!

有沒有什麼辦法可以根據具有較小的值的人羣獲得排名 timestamp?

預期的結果是一樣的東西

t  x  perc_rank_win 
0  1  0.0 
1  3  1.0   # since 3 is largest from [1, 3] 
2  5  1.0   # since 5 is largest from [1, 3, 5] 
3  4  0.66   # since values are [1, 3, 4!, 5] 
4  2  0.25   # since [1, 2!, 3, 4, 5] 

回答

1

爲了獲得滾動percent_rank(),你就必須能夠使用窗框定義排名funtions你根本無法。 (這樣的事情w = Window.orderBy('t', 'x').rowsBetween(-sys.maxsize, 0)

我發現了一個辦法解決它,但它涉及到一個笛卡兒連接,這是非常昂貴:

首先,讓我們創建示例數據框:

import pyspark.sql.functions as psf 
from pyspark.sql import HiveContext 
hc = HiveContext(sc) 
df = hc.createDataFrame(sc.parallelize(zip(range(5), [1,3,5,4,2])), ['t', 'x']) 

笛卡兒連接:

df2 = df.groupBy(df.x.alias('x2')).agg(psf.min("t").alias("t2")) 
df_cross = df.join(df2).filter("t2 <= t").withColumn("isSup", (df.x > df2.x2).cast("int")) 

    +---+---+---+---+-----+ 
    | t| x| t2| x2|isSup| 
    +---+---+---+---+-----+ 
    | 1| 3| 0| 1| 1| 
    | 2| 5| 0| 1| 1| 
    | 2| 5| 1| 3| 1| 
    | 3| 4| 0| 1| 1| 
    | 3| 4| 1| 3| 1| 
    | 3| 4| 2| 5| 0| 
    | 4| 2| 0| 1| 1| 
    | 4| 2| 1| 3| 0| 
    | 4| 2| 2| 5| 0| 
    | 4| 2| 3| 4| 0| 
    +---+---+---+---+-----+ 

最後,我們通過 't', 'X' 基團:

df_fin = df_cross.groupBy("t", "x").agg(
    psf.count("*").alias("count"), 
    psf.sum("isSup").alias("rank") 
).withColumn('pct_rank_win', psf.col("rank")/psf.greatest(psf.col('count') - 1, psf.lit(1))) 

    +---+---+-----+----+------------------+ 
    | t| x|count|rank|  pct_rank_win| 
    +---+---+-----+----+------------------+ 
    | 0| 1| 1| 0|    0.0| 
    | 1| 3| 2| 1|    1.0| 
    | 2| 5| 3| 2|    1.0| 
    | 3| 4| 4| 2|0.6666666666666666| 
    | 4| 2| 5| 1|    0.25| 
    +---+---+-----+----+------------------+ 

groupBy('x')df2定義是爲了確保密集的排名(同一值將具有相同的等級),如用下面的例子:

df = hc.createDataFrame(sc.parallelize(zip(range(6), [1,3,3,5,4,2])), ['t', 'x']) 

    +---+---+-----+----+------------------+ 
    | t| x|count|rank|  pct_rank_win| 
    +---+---+-----+----+------------------+ 
    | 0| 1| 1| 0|    0.0| 
    | 1| 3| 2| 1|    1.0| 
    | 2| 3| 2| 1|    1.0| 
    | 3| 5| 3| 2|    1.0| 
    | 4| 4| 4| 2|0.6666666666666666| 
    | 5| 2| 5| 1|    0.25| 
    +---+---+-----+----+------------------+ 
1

這裏的另一種解決辦法我試圖用collect_set在窗口分區,

from pyspark.sql import SparkSession 
from pyspark.sql import Window 
from pyspark.sql import functions as F 

spark = SparkSession.builder.getOrCreate() 
df = spark.createDataFrame([(0,1),(1,3),(2,5),(3,4),(4,2)],['t','x']) 
df.show() 
+---+---+ 
| t| x| 
+---+---+ 
| 0| 1| 
| 1| 3| 
| 2| 5| 
| 3| 4| 
| 4| 2| 
+---+---+ 
w = Window.orderBy('t') 
df = df.withColumn('somecol',F.collect_set('x').over(w)) 
df.show() 
+---+---+---------------+ 
| t| x|  somecol| 
+---+---+---------------+ 
| 0| 1|   [1]| 
| 1| 3|   [1, 3]| 
| 2| 5|  [1, 5, 3]| 
| 3| 4| [1, 5, 3, 4]| 
| 4| 2|[1, 5, 2, 3, 4]| 
+---+---+---------------+ 
def pct_rank(s,v): 
    x=sorted(s) 
    if len(x) == 1: 
     return float(0) 
    else: 
     pc = float(1)/(len(x)-1) 
     idx = x.index(v) 
     return float("{0:.2f}".format(idx*pc)) 

pct_udf = F.udf(pct_rank) 
df.select("t","x",pct_udf(df['somecol'],df['x']).alias('pct_rank')).show() 
+---+---+--------+ 
| t| x|pct_rank| 
+---+---+--------+ 
| 0| 1|  0.0| 
| 1| 3|  1.0| 
| 2| 5|  1.0| 
| 3| 4| 0.67| 
| 4| 2| 0.25| 
+---+---+--------+