2016-08-03 56 views

回答

4

以下是Python中使用Dataframe API(Spark 1.6 +)的示例實現。

import pyspark.sql.functions as F 
import numpy as np 
from pyspark.sql.types import FloatType 

假設我們有「工資」火花數據幀,如爲客戶月薪:

月| customer_id |工資

,我們想找到貫穿所有月份

每個客戶的平均年薪

第一步:寫一個用戶定義函數來計算平均

def find_median(values_list): 
    try: 
     median = np.median(values_list) #get the median of values in a list in each row 
     return round(float(median),2) 
    except Exception: 
     return None #if there is anything wrong with the given values 

median_finder = F.udf(find_median,FloatType()) 

第2步:在工資彙總列將它們收集到每行中的工資列表中:

salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries")) 

第3步:調用saraies col上的median_finder udf並添加中間值作爲新列

salaries_list = salaries_list.withColumn("median",median_finder("salaries")) 
+1

使用np.nanmedian(values_list)忽略NaN並且有時是更好的選擇 –

相關問題