這可以使用聚合來完成,但是這種方法比pandas方法具有更高的複雜度。但是您可以使用UDF實現類似的性能。它不會像大熊貓一樣優雅,但:
假定該數據集的節日:
holidays = ['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03']
index = spark.sparkContext.broadcast(sorted(holidays))
而2016年的日期數據集的數據幀:
from datetime import datetime, timedelta
dates_array = [(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(366)]
from pyspark.sql import Row
df = spark.createDataFrame([Row(date=d) for d in dates_array])
UDF可以使用熊貓searchsorted
,但需要在執行者上安裝熊貓。 insted的,你可以使用Python的計劃是這樣的:
def nearest_holiday(date):
last_holiday = index.value[0]
for next_holiday in index.value:
if next_holiday >= date:
break
last_holiday = next_holiday
if last_holiday > date:
last_holiday = None
if next_holiday < date:
next_holiday = None
return (last_holiday, next_holiday)
from pyspark.sql.types import *
return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())])
from pyspark.sql.functions import udf
nearest_holiday_udf = udf(nearest_holiday, return_type)
,可與withColumn
使用:
df.withColumn('holiday', nearest_holiday_udf('date')).show(5, False)
+----------+-----------------------+
|date |holiday |
+----------+-----------------------+
|2016-01-01|[null,2016-01-03] |
|2016-01-02|[null,2016-01-03] |
|2016-01-03|[2016-01-03,2016-01-03]|
|2016-01-04|[2016-01-03,2016-03-03]|
|2016-01-05|[2016-01-03,2016-03-03]|
+----------+-----------------------+
only showing top 5 rows
謝謝,這看起來不錯。我需要將它移植到scala中;) –
你指的是什麼'sorted(holidays)'操作?它是一個pyspark api嗎? –
這是python的。它對UDF進行排序,我可以通過它來查找匹配的日期。 – Mariusz