我知道一個解決方案來獲得與RDDS每一行的百分比。首先,你的RDD轉換成數據幀:
# convert to rdd of dicts
rdd = df.rdd
rdd = rdd.map(lambda x: x.asDict())
然後,你可以計算每行的位數:
column_to_decile = 'price'
total_num_rows = rdd.count()
def add_to_dict(_dict, key, value):
_dict[key] = value
return _dict
def get_percentile(x, total_num_rows):
_dict, row_number = x
percentile = x[1]/float(total_num_rows)
return add_to_dict(_dict, "percentile", percentile)
rdd_percentile = rdd.map(lambda d: (d[column_to_decile], d)) # make column_to_decile a key
rdd_percentile = rdd_percentile.sortByKey(ascending=False) # so 1st decile has largest
rdd_percentile = rdd_percentile.map(lambda x: x[1]) # remove key
rdd_percentile = rdd_percentile.zipWithIndex() # append row number
rdd_percentile = rdd_percentile.map(lambda x: get_percentile(x, total_num_rows))
最後,轉換回一個數據幀有:
df = sqlContext.createDataFrame(rdd_percentile)
要得到最接近百分位數的行爲0.6,你可以這樣做:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
def get_row_with_percentile(df, percentile):
func = udf(lambda x: abs(x), DoubleType())
df_distance = df.withColumn("distance", func(df['percentile'] - percentile))
min_distance = df_distance.groupBy().min('distance').collect()[0]['min(distance)']
result = df_distance.filter(df_distance['distance'] == min_distance)
result.drop("distance")
return result
get_row_with_percentile(df, 0.6).show()
對於那些感興趣的/懶惰的,那是'從pyspark import SparkContext,HiveContext; sc = SparkContext(); hiveContext = HiveContext(sc); hiveContext.registerDataFrameAsTable(df,「df」); hiveContext.sql(「SELECT percentntinti(price,0.75)FROM df」);'將價格定在第75百分位。 –