2016-11-22 93 views
0

在熊貓我有類似火花SQL距離最近的假期

indices = df.dateColumn.apply(holidays.index.searchsorted) 
df['nextHolidays'] = holidays.index[indices] 
df['previousHolidays'] = holidays.index[indices - 1] 

一個函數,它計算到最近的假期並存儲作爲新列的距離。

searchsortedhttp://pandas.pydata.org/pandas-docs/version/0.18.1/generated/pandas.Series.searchsorted.html對於大熊貓來說是一個很好的解決方案,因爲這給了我下一個假期的索引而沒有算法複雜度高的問題Parallelize pandas apply例如,這種方法比並行循環要快得多。

我該如何在火花或蜂房中實現這一點?

回答

1

這可以使用聚合來完成,但是這種方法比pandas方法具有更高的複雜度。但是您可以使用UDF實現類似的性能。它不會像大熊貓一樣優雅,但:

假定該數據集的節日:

holidays = ['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03'] 
index = spark.sparkContext.broadcast(sorted(holidays)) 

而2016年的日期數據集的數據幀:

from datetime import datetime, timedelta 
dates_array = [(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(366)] 
from pyspark.sql import Row 
df = spark.createDataFrame([Row(date=d) for d in dates_array]) 

UDF可以使用熊貓searchsorted,但需要在執行者上安裝熊貓。 insted的,你可以使用Python的計劃是這樣的:

def nearest_holiday(date): 
    last_holiday = index.value[0] 
    for next_holiday in index.value: 
     if next_holiday >= date: 
      break 
     last_holiday = next_holiday 
    if last_holiday > date: 
     last_holiday = None 
    if next_holiday < date: 
     next_holiday = None 
    return (last_holiday, next_holiday) 


from pyspark.sql.types import * 
return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())]) 

from pyspark.sql.functions import udf 
nearest_holiday_udf = udf(nearest_holiday, return_type) 

,可與withColumn使用:

df.withColumn('holiday', nearest_holiday_udf('date')).show(5, False) 

+----------+-----------------------+ 
|date  |holiday    | 
+----------+-----------------------+ 
|2016-01-01|[null,2016-01-03]  | 
|2016-01-02|[null,2016-01-03]  | 
|2016-01-03|[2016-01-03,2016-01-03]| 
|2016-01-04|[2016-01-03,2016-03-03]| 
|2016-01-05|[2016-01-03,2016-03-03]| 
+----------+-----------------------+ 
only showing top 5 rows 
+0

謝謝,這看起來不錯。我需要將它移植到scala中;) –

+0

你指的是什麼'sorted(holidays)'操作?它是一個pyspark api嗎? –

+0

這是python的。它對UDF進行排序,我可以通過它來查找匹配的日期。 – Mariusz