2017-04-13 23 views
1

我有一組帶時間戳的位置數據,其中包含一組附加到每個位置的字符串要素ID。我想用一個窗口在火花齊心協力所有這些數組享有跨越當前N ID字符串和接下來的N行,鼻翼:用戶定義的窗口中的所有行的功能

import sys 
from pyspark.sql.window import Window 
import pyspark.sql.functions as func 
windowSpec = Window \ 
    .partitionBy(df['userid']) \ 
    .orderBy(df['timestamp']) \ 
    .rowsBetween(-50, 50) 

dataFrame = sqlContext.table("locations") 
featureIds = featuresCollector(dataFrame['featureId']).over(windowSpec) 
dataFrame.select(
    dataFrame['product'], 
    dataFrame['category'], 
    dataFrame['revenue'], 
    featureIds.alias("allFeatureIds")) 

這可能與Spark和如果是這樣,怎麼我可以寫一個像featuresCollector這樣的函數來收集窗口中的所有特徵ID嗎?

回答

1

Spark UDF不能用於聚合。 Spark提供了許多可用於自定義聚合的工具(UserDefinedAggregateFunctions,Aggregators,AggregateExpressions),其中一些工具可用於窗口化,但沒有一個可以在Python中定義。

如果你想要收集的記錄,collect_list應該做的伎倆。請記住,這是一個非常昂貴的操作。

from pyspark.sql.functions import collect_list 

featureIds = collect_list('featureId').over(windowSpec)