2017-08-17 67 views
1

我有一個SparkR數據幀組的最後一個值,如下所示:獲得在星火

#Create R data.frame 
custId <- c(rep(1001, 5), rep(1002, 3), 1003) 
date <- c('2013-08-01','2014-01-01','2014-02-01','2014-03-01','2014-04-01','2014-02-01','2014-03-01','2014-04-01','2014-04-01') 
desc <- c('New','New','Good','New', 'Bad','New','Good','Good','New') 
newcust <- c(1,1,0,1,0,1,0,0,1) 
df <- data.frame(custId, date, desc, newcust) 

#Create SparkR DataFrame  
df <- createDataFrame(df) 
display(df) 
     custId| date | desc | newcust 
     -------------------------------------- 
     1001 | 2013-08-01| New | 1 
     1001 | 2014-01-01| New | 1 
     1001 | 2014-02-01| Good | 0 
     1001 | 2014-03-01| New | 1 
     1001 | 2014-04-01| Bad | 0 
     1002 | 2014-02-01| New | 1 
     1002 | 2014-03-01| Good | 0 
     1002 | 2014-04-01| Good | 0 
     1003 | 2014-04-01| New | 1 

newcust預示着新客戶的每一個新的custId出現一次,或者是否同custIddesc恢復到'新」。我想獲得的是newcust的每個分組的最後desc值,同時保持每個分組的第一個date。下面是我想要獲得的DataFrame。我如何在Spark中做到這一點? PySpark或SparkR代碼都可以工作。

#What I want 
custId| date | newcust | finaldesc 
---------------------------------------------- 
1001 | 2013-08-01| 1  | New 
1001 | 2014-01-01| 1  | Good 
1001 | 2014-03-01| 1  | Bad 
1002 | 2014-02-01| 1  | Good 
1003 | 2014-04-01| 1  | New 

回答

1

我不知道sparkR所以我會在pyspark回答。 您可以使用窗口功能來實現這一點。

首先,我們定義了「newcust分組」,你希望每一個行,其中newcust等於1是一個新組的開始,計算累計總和將這樣的伎倆:

from pyspark.sql import Window 
import pyspark.sql.functions as psf 

w1 = Window.partitionBy("custId").orderBy("date") 
df1 = df.withColumn("subgroup", psf.sum("newcust").over(w1)) 

+------+----------+----+-------+--------+ 
|custId|  date|desc|newcust|subgroup| 
+------+----------+----+-------+--------+ 
| 1001|2013-08-01| New|  1|  1| 
| 1001|2014-01-01| New|  1|  2| 
| 1001|2014-02-01|Good|  0|  2| 
| 1001|2014-03-01| New|  1|  3| 
| 1001|2014-04-01| Bad|  0|  3| 
| 1002|2014-02-01| New|  1|  1| 
| 1002|2014-03-01|Good|  0|  1| 
| 1002|2014-04-01|Good|  0|  1| 
| 1003|2014-04-01| New|  1|  1| 
+------+----------+----+-------+--------+ 

對於每一個subgroup,我們希望保持第一日期:每的

w2 = Window.partitionBy("custId", "subgroup") 
df2 = df1.withColumn("first_date", psf.min("date").over(w2)) 

+------+----------+----+-------+--------+----------+ 
|custId|  date|desc|newcust|subgroup|first_date| 
+------+----------+----+-------+--------+----------+ 
| 1001|2013-08-01| New|  1|  1|2013-08-01| 
| 1001|2014-01-01| New|  1|  2|2014-01-01| 
| 1001|2014-02-01|Good|  0|  2|2014-01-01| 
| 1001|2014-03-01| New|  1|  3|2014-03-01| 
| 1001|2014-04-01| Bad|  0|  3|2014-03-01| 
| 1002|2014-02-01| New|  1|  1|2014-02-01| 
| 1002|2014-03-01|Good|  0|  1|2014-02-01| 
| 1002|2014-04-01|Good|  0|  1|2014-02-01| 
| 1003|2014-04-01| New|  1|  1|2014-04-01| 
+------+----------+----+-------+--------+----------+ 

最後,我們要保留的最後一行(按日期排序):

w3 = Window.partitionBy("custId", "subgroup").orderBy(psf.desc("date")) 
df3 = df2.withColumn(
    "rn", 
    psf.row_number().over(w3) 
).filter("rn = 1").select(
    "custId", 
    psf.col("first_date").alias("date"), 
    "desc" 
) 

+------+----------+----+ 
|custId|  date|desc| 
+------+----------+----+ 
| 1001|2013-08-01| New| 
| 1001|2014-01-01|Good| 
| 1001|2014-03-01| Bad| 
| 1002|2014-02-01|Good| 
| 1003|2014-04-01| New| 
+------+----------+----+ 
0

這裏是@瑪麗在SparkR代碼:

w1 <- orderBy(windowPartitionBy('custId'), df$date) 
df1 <- withColumn(df, "subgroup", over(sum(df$newcust), w1)) 

w2 <- windowPartitionBy("custId", "subgroup") 
df2 <- withColumn(df1, "first_date", over(min(df1$date), w2)) 

w3 <- orderBy(windowPartitionBy("custId", "subgroup"), desc(df$date)) 
df3 <- withColumn(df2, "rn", over(row_number(), w3)) 
df3 <- select(filter(df3, df3$rn == 1), "custId", "first_date", "desc") 
df3 <- withColumnRenamed(df3, 'first_date', "date") 

df3 <- arrange(df3, 'custId', 'date') 
display(df3) 
+------+----------+----+ 
|custId|  date|desc| 
+------+----------+----+ 
| 1001|2013-08-01| New| 
| 1001|2014-01-01|Good| 
| 1001|2014-03-01| Bad| 
| 1002|2014-02-01|Good| 
| 1003|2014-04-01| New| 
+------+----------+----+