2017-10-09 128 views
2

我有複雜的邏輯實現,嘗試了一段時間,但仍然沒有線索,請幫助檢查它是否切實可行以及如何執行。非常感謝你!!根據當前行和前一行的列創建一個計算列

我有以下SparkSQL數據框(datetime正在增加,「類型」被重複,每節(不同類型的)總是與'flag'=1開始):

+---------+-----+----+-----+ 
|datetime |type |flag|value| 
+---------+-----+----+-----+ 
|20170901 |A |1 | 560| 
|20170902 |A |0 | 3456| 
|20170903 |A |0 | 50| 
|20170904 |A |0 | 789| 
...... 
|20170912 |B |1 | 345| 
|20170913 |B |0 | 4510| 
|20170915 |B |0 | 508| 
...... 
|20170919 |C |1 | 45| 
|20170923 |C |0 | 410| 
|20170925 |C |0 | 108| 
...... 
|20171001 |A |1 | 198| 
|20171002 |A |0 | 600| 
|20171005 |A |0 | 675| 
|20171008 |A |0 | 987| 
...... 

我需要根據以前創建計算列行和當前行,能有這樣數據幀(計算場SEQ指增加部分序列):

+---------+-----+----+-----+-----+ 
|datetime |type |flag|value| Seq| 
+---------+-----+----+-----+-----+ 
|20170901 |A |1 | 560| 1| 
|20170902 |A |0 | 3456| 1| 
|20170903 |A |0 | 50| 1| 
|20170904 |A |0 | 789| 1| 
...... 
|20170912 |B |1 | 345| 2| 
|20170913 |B |0 | 4510| 2| 
|20170915 |B |0 | 508| 2| 
...... 
|20170919 |C |1 | 45| 3| 
|20170923 |C |0 | 410| 3| 
|20170925 |C |0 | 108| 3| 
...... 
|20171001 |A |1 | 198| 4| 
|20171002 |A |0 | 600| 4| 
|20171005 |A |0 | 675| 4| 
|20171008 |A |0 | 987| 4| 
...... 

任何線索讚賞。 我寫的代碼(感謝https://stackoverflow.com/users/1592191/mrsrinivas):

from pyspark.sql import SQLContext, Row 
from pyspark import SparkConf, SparkContext 
from pyspark.sql.session import SparkSession 
from pyspark.sql.window import Window 
import pyspark.sql.functions as func 
import sys 

conf = SparkConf().setMaster("local[2]") 
conf = conf.setAppName("test") 
sc = SparkContext.getOrCreate(conf=conf) 
spark = SparkSession(sc) 
rdd = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56), 
         (20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453), 
         (20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108), 
         (20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)]) 
df = spark.createDataFrame(rdd, ["datatime", "type", "flag", "value"]) 
df.show() 

windowSpec = Window.partitionBy(df['type']).orderBy(df['flag'].desc()).rangeBetween(-sys.maxsize, sys.maxsize) 
df.withColumn('Seq', func.dense_rank().over(windowSpec)) 
df.show() 

但遇到錯誤:Py4JJavaError:同時呼籲o514.withColumn發生錯誤。 :org.apache.spark.sql.AnalysisException:窗口框架無界前置和無界下面之間的範圍必須匹配無界前置和當前行之間所需的框架; 有什麼想法?

回答

0

希望這會有所幫助!

from pyspark.sql.window import Window 
from pyspark.sql.functions import col, monotonically_increasing_id, when, last 
import sys 

#sample data 
df = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56), 
        (20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453), 
        (20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108), 
        (20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)]).\ 
    toDF(["datetime", "type", "flag", "value"]) 

df = df.withColumn("row_id",monotonically_increasing_id()) 
w = Window.partitionBy(col("type")).orderBy(col('datetime')) 
df1 = df.withColumn("seq_temp", when(col('flag')==1, col('row_id')).otherwise(None)) 
df1 = df1.withColumn("seq", last('seq_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).\ 
    drop('row_id','seq_temp').\ 
    sort('Seq') 
df1.show() 

輸出是:

+--------+----+----+-----+----------+ 
|datetime|type|flag|value|  seq| 
+--------+----+----+-----+----------+ 
|20170901| A| 1| 560|   0| 
|20170902| A| 0| 3560|   0| 
|20170903| A| 0| 50|   0| 
|20170904| A| 0| 56|   0| 
|20170913| B| 0| 4510|   4| 
|20170912| B| 1| 345|   4| 
|20170915| B| 0| 453|   4| 
|20170919| C| 1| 55|8589934592| 
|20170925| C| 0| 108|8589934592| 
|20170923| C| 0| 410|8589934592| 
|20171001| A| 1| 189|8589934595| 
|20171008| A| 0| 956|8589934595| 
|20171005| A| 0| 650|8589934595| 
|20171002| A| 0| 600|8589934595| 
+--------+----+----+-----+----------+ 

seq值是不完美的序列,但單調遞增。

+1

非常感謝你普雷姆!它運作良好,無論seq號碼有多大。 – peter

+0

@peter很高興它幫助:)也許你應該[接受它作爲正確的答案](https://stackoverflow.com/help/someone-answers),以便該問題被標記爲關閉。 – Prem

+0

當然Prem。公認! – peter

0

您可以使用下面的代碼,我已經修改了它有「A」的兩倍

from pyspark.sql.window import Window 
    from pyspark.sql.functions import col, monotonically_increasing_id, when, last 
    from pyspark.sql.functions import lit 
    import sys 
    import pyspark.sql.functions as func 

    #sample data 
    df = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56), 
         (20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453), 
         (20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108), 
         (20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)]).\ 
     toDF(["datetime", "type", "flag", "value"]) 

    df = df.withColumn("row_id",monotonically_increasing_id()) 
    w = Window.partitionBy(col("type")).orderBy(col('datetime')) 
    df1 = df.withColumn("seq_temp", when(col('flag')==1, col('row_id')).otherwise(None)) 
    df1 = df1.withColumn("seq", last('seq_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).sort('Seq') 
    r = df1.withColumn('seq_item',lit(0)) 
    windowSpec = Window.partitionBy(r['seq_item']).orderBy(r['seq']) 
    s = r.withColumn('seq_1',func.dense_rank().over(windowSpec)).drop('seq_temp','seq','seq_item','row_id') 
    s.show() 

+--------+----+----+-----+--------+-----+ 
|datatime|type|flag|value|seq_item|seq_1| 
+--------+----+----+-----+--------+-----+ 
|20170901| A| 1| 560|  0| 1| 
|20170902| A| 0| 3560|  0| 1| 
|20170903| A| 0| 50|  0| 1| 
|20170904| A| 0| 56|  0| 1| 
|20170912| B| 1| 345|  0| 2| 
|20170913| B| 0| 4510|  0| 2| 
|20170915| B| 0| 453|  0| 2| 
|20170919| C| 1| 55|  0| 3| 
|20170923| C| 0| 410|  0| 3| 
|20170925| C| 0| 108|  0| 3| 
|20171001| A| 1| 189|  0| 4| 
|20171002| A| 0| 600|  0| 4| 
|20171005| A| 0| 650|  0| 4| 
|20171008| A| 0| 956|  0| 4| 
+--------+----+----+-----+--------+-----+ 
+0

謝謝。但是第四部分('類型'欄)是'A'而不是'D',這使得它變得複雜。 – peter

相關問題