2017-08-21 94 views
5

我有一個由時間戳列和美元列組成的數據集。我希望找到以每行時間戳結束的每週平均美元數。我最初是在查看pyspark.sql.functions.window函數,但是按星期計算數據。pyspark:使用時間序列數據滾動平均值

下面是一個例子:

%pyspark 
import datetime 
from pyspark.sql import functions as F 

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"]) 
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp')) 

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg')) 
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect() 

這將導致兩個記錄:

|  start  |   end   | avg | 
|---------------------|----------------------|-----| 
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0| 
|---------------------|----------------------|-----| 
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0| 
|---------------------|----------------------|-----| 

窗口功能分級的時間序列數據,而不是執行滾動平均值。

有沒有辦法執行一個滾動平均值,我將得到每行的周平均值,並在該行的timestampGMT處結束一段時間?

編輯:

張以下的答案是接近我想要的,但我想看看沒有什麼。

這裏有一個更好的例子來說明什麼,我想要知道的:

%pyspark 
from pyspark.sql import functions as F 
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days")))) 

這將導致以下數據幀:

dollars timestampGMT   rolling_average 
25  2017-03-18 11:27:18.0 25 
17  2017-03-10 15:27:18.0 15 
13  2017-03-15 12:27:18.0 15 

我想一般是在在時間戳GMT列中繼續執行日期,這將導致:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17 
13  2017-03-15 12:27:18.0 15 
25  2017-03-18 11:27:18.0 19 

在上面的結果中因爲沒有前面的記錄,所以2017-03-10的rolling_average爲17。 2017-03-15的rolling_average值爲15,因爲它在2017-03-15的平均值爲13,而2017-03-10的平均值爲17,這與前7天的窗口一致。 2017-03-18的平均移動平均數爲19,因爲2017-03-18平均數爲25,2017-03-10平均數爲13,與前7天的數據一致,2017年不包括17 -03-10,因爲這與前7天窗口不一致。

有沒有辦法做到這一點,而不是每週窗口不重疊的裝倉窗口?

回答

4

我想通了正確的方法使用該計算器計算移動/滾動平均值:

Spark Window Functions - rangeBetween dates

基本想法是將您的時間戳列轉換爲secon ds,然後可以使用pyspark.sql.Window類中的rangeBetween函數在窗口中包含正確的行。

下面是解決例如:

%pyspark 
from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

這導致移動平均數的確切列,我一直在尋找:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17.0 
13  2017-03-15 12:27:18.0 15.0 
25  2017-03-18 11:27:18.0 19.0 
1

你的意思是這樣的:

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"), 
          (13, "2017-03-11T12:27:18+00:00"), 
          (21, "2017-03-17T11:27:18+00:00")], 
          ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days")))) 

輸出:

+-------+-------------------+---------------+         
|dollars|timestampGMT  |rolling_average| 
+-------+-------------------+---------------+ 
|21  |2017-03-17 19:27:18|21.0   | 
|17  |2017-03-11 23:27:18|15.0   | 
|13  |2017-03-11 20:27:18|15.0   | 
+-------+-------------------+---------------+ 
+0

感謝張,更接近我想要的東西,但不正是我想要的。您的代碼仍然通過日期分箱計算答案。我希望每週的平均水平在該行日期結束。沒有做出一個好榜樣是我的錯。我將用一個更新的示例來編輯我的帖子,以顯示我想要的內容。 –