下面是我的火花數據幀pyspark動態列計算
a b c
1 3 4
2 0 0
4 1 0
2 2 0
我的輸出應該如下
a b c
1 3 4
2 0 2
4 1 -1
2 2 3
公式是prev(c)-b+a
即4-2+0=2
和2-4+1=-1
誰能幫我跨越這個障礙?
下面是我的火花數據幀pyspark動態列計算
a b c
1 3 4
2 0 0
4 1 0
2 2 0
我的輸出應該如下
a b c
1 3 4
2 0 2
4 1 -1
2 2 3
公式是prev(c)-b+a
即4-2+0=2
和2-4+1=-1
誰能幫我跨越這個障礙?
from pyspark.sql.functions import lag, udf
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
numbers = [[1,2,3],[2,3,4],[3,4,5],[5,6,7]]
df = sc.parallelize(numbers).toDF(['a','b','c'])
df.show()
w = Window().partitionBy().orderBy('a')
calculate = udf(lambda a,b,c:a-b+c,IntegerType())
df = df.withColumn('result', lag("a").over(w)-df.b+df.c)
df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 5| 6| 7|
+---+---+---+
+---+---+---+------+
| a| b| c|result|
+---+---+---+------+
| 1| 2| 3| null|
| 2| 3| 4| 2|
| 3| 4| 5| 3|
| 5| 6| 7| 4|
+---+---+---+------+
非常感謝您的回答。但是,如果我的數據幀是這樣的話,這不起作用= [[1,1,2,3],[1,1,2,3],[2,2,3,4],[3,3,4 ,5],[3,3,4,5],[3,3,4,5],[4,5,6,7]] df = sc.parallelize(numbers).toDF(['cat' ,'a','b','c']) w = Window()。partitionBy()。orderBy('cat') df = df.withColumn('result',lag(「a」)。over (w)的-df.b + df.c)。我應該在最後採取任何明確的,請幫助 –
它給了什麼錯誤? – StackPointer
它工作得很好,給我...給定你的數據集 – StackPointer
希望這可能有幫助!
import pyspark.sql.functions as f
from pyspark.sql.window import Window
df = sc.parallelize([
[1,3],
[2,0],
[4,1],
[2,2]
]).toDF(('a', 'b'))
df1 = df.withColumn("row_id", f.monotonically_increasing_id())
w = Window.partitionBy().orderBy(f.col("row_id"))
df1 = df1.withColumn("c_temp", f.when(f.col("row_id")==0, f.lit(4)).otherwise(- f.col("a") + f.col("b")))
df1 = df1.withColumn("c", f.sum(f.col("c_temp")).over(w)).drop("c_temp","row_id")
df1.show()
輸出是:
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 3| 4|
| 2| 0| 2|
| 4| 1| -1|
| 2| 2| -1|
+---+---+---+
假設你已經嘗試做一些事情來解決這個問題。請告訴我們你想要做什麼。 – Grigoriy
那麼,你的問題是什麼?你有精確的公式,你可以很容易地搜索如何獲得以前的值和如何總結字段 –
是的我使用下面的方法, 方法創建了新的列c_new,它滯後1並做了c_new-a + b後來分析了值必須從先前生成的c_new值中動態獲取。 下面是代碼 DF = df.withColumn( 'c_new',func.lag(DF [ 'C'])。在(Window.partitionBy( 「A」)。ORDERBY( 「A」))) df = df.withColumn('Stock_New',(df ['c_new'] - stock_output_table ['a'])+ stock_output_table ['b']) 我被卡在這裏不知道如何動態地從c_new –