2016-05-03 150 views
3

我有一個Spark數據框,其中有Date,GroupPrice列。如何使用python中的Spark數據框和GroupBy派生百分位數

我試圖在Python中爲 數據框的Price列推導percentile(0.6)。此外,我需要將輸出添加爲新列。

我嘗試下面的代碼:

perudf = udf(lambda x: x.quantile(.6)) 
df1 = df.withColumn("Percentile", df.groupBy("group").agg("group"),perudf('price')) 

,但它拋出以下錯誤:

assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" 
AssertionError: all exprs should be Column 

回答

3

您可以使用 「percentile_approx」 使用SQL。在pyspark中很難創建UDF。

參考此鏈接等詳細資料:https://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%[email protected].com%3E

+2

對於那些感興趣的/懶惰的,那是'從pyspark import SparkContext,HiveContext; sc = SparkContext(); hiveContext = HiveContext(sc); hiveContext.registerDataFrameAsTable(df,「df」); hiveContext.sql(「SELECT percentntinti(price,0.75)FROM df」);'將價格定在第75百分位。 –

1

我知道一個解決方案來獲得與RDDS每一行的百分比。首先,你的RDD轉換成數據幀:

# convert to rdd of dicts 
rdd = df.rdd 
rdd = rdd.map(lambda x: x.asDict()) 

然後,你可以計算每行的位數:

column_to_decile = 'price' 
total_num_rows = rdd.count() 


def add_to_dict(_dict, key, value): 
    _dict[key] = value 
    return _dict 


def get_percentile(x, total_num_rows): 
    _dict, row_number = x 
    percentile = x[1]/float(total_num_rows) 
    return add_to_dict(_dict, "percentile", percentile) 


rdd_percentile = rdd.map(lambda d: (d[column_to_decile], d)) # make column_to_decile a key 
rdd_percentile = rdd_percentile.sortByKey(ascending=False) # so 1st decile has largest 
rdd_percentile = rdd_percentile.map(lambda x: x[1]) # remove key 
rdd_percentile = rdd_percentile.zipWithIndex() # append row number 
rdd_percentile = rdd_percentile.map(lambda x: get_percentile(x, total_num_rows)) 

最後,轉換回一個數據幀有:

df = sqlContext.createDataFrame(rdd_percentile) 

要得到最接近百分位數的行爲0.6,你可以這樣做:

from pyspark.sql.types import * 
from pyspark.sql.functions import udf 


def get_row_with_percentile(df, percentile): 
    func = udf(lambda x: abs(x), DoubleType()) 
    df_distance = df.withColumn("distance", func(df['percentile'] - percentile)) 
    min_distance = df_distance.groupBy().min('distance').collect()[0]['min(distance)'] 
    result = df_distance.filter(df_distance['distance'] == min_distance) 
    result.drop("distance") 
    return result 


get_row_with_percentile(df, 0.6).show() 
0

您可以使用窗口功能,只需定義一個聚集窗口(在你的情況下,所有的數據),然後通過百分位值過濾:

from pyspark.sql.window import Window 
from pyspark.sql.functions import percent_rank 

w = Window.orderBy(df.price) 
df.select('price', percent_rank().over(w).alias("percentile"))\ 
    .where('percentile == 0.6').show() 

percent_rankpyspark.sql.functions

可如果你喜歡,你可以使用這個SQL接口在這個databricks post

相關問題