我有複雜的邏輯實現,嘗試了一段時間,但仍然沒有線索,請幫助檢查它是否切實可行以及如何執行。非常感謝你!!根據當前行和前一行的列創建一個計算列
我有以下SparkSQL
數據框(datetime
正在增加,「類型」被重複,每節(不同類型的)總是與'flag'=1
開始):
+---------+-----+----+-----+
|datetime |type |flag|value|
+---------+-----+----+-----+
|20170901 |A |1 | 560|
|20170902 |A |0 | 3456|
|20170903 |A |0 | 50|
|20170904 |A |0 | 789|
......
|20170912 |B |1 | 345|
|20170913 |B |0 | 4510|
|20170915 |B |0 | 508|
......
|20170919 |C |1 | 45|
|20170923 |C |0 | 410|
|20170925 |C |0 | 108|
......
|20171001 |A |1 | 198|
|20171002 |A |0 | 600|
|20171005 |A |0 | 675|
|20171008 |A |0 | 987|
......
我需要根據以前創建計算列行和當前行,能有這樣數據幀(計算場SEQ指增加部分序列):
+---------+-----+----+-----+-----+
|datetime |type |flag|value| Seq|
+---------+-----+----+-----+-----+
|20170901 |A |1 | 560| 1|
|20170902 |A |0 | 3456| 1|
|20170903 |A |0 | 50| 1|
|20170904 |A |0 | 789| 1|
......
|20170912 |B |1 | 345| 2|
|20170913 |B |0 | 4510| 2|
|20170915 |B |0 | 508| 2|
......
|20170919 |C |1 | 45| 3|
|20170923 |C |0 | 410| 3|
|20170925 |C |0 | 108| 3|
......
|20171001 |A |1 | 198| 4|
|20171002 |A |0 | 600| 4|
|20171005 |A |0 | 675| 4|
|20171008 |A |0 | 987| 4|
......
任何線索讚賞。 我寫的代碼(感謝https://stackoverflow.com/users/1592191/mrsrinivas):
from pyspark.sql import SQLContext, Row
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as func
import sys
conf = SparkConf().setMaster("local[2]")
conf = conf.setAppName("test")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
rdd = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56),
(20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453),
(20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108),
(20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)])
df = spark.createDataFrame(rdd, ["datatime", "type", "flag", "value"])
df.show()
windowSpec = Window.partitionBy(df['type']).orderBy(df['flag'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)
df.withColumn('Seq', func.dense_rank().over(windowSpec))
df.show()
但遇到錯誤:Py4JJavaError:同時呼籲o514.withColumn發生錯誤。 :org.apache.spark.sql.AnalysisException:窗口框架無界前置和無界下面之間的範圍必須匹配無界前置和當前行之間所需的框架; 有什麼想法?
非常感謝你普雷姆!它運作良好,無論seq號碼有多大。 – peter
@peter很高興它幫助:)也許你應該[接受它作爲正確的答案](https://stackoverflow.com/help/someone-answers),以便該問題被標記爲關閉。 – Prem
當然Prem。公認! – peter