2017-09-19 107 views
0

下面是我的火花數據幀pyspark動態列計算

a b c 
1 3 4 
2 0 0 
4 1 0 
2 2 0 

我的輸出應該如下

a b c 
1 3 4 
2 0 2 
4 1 -1 
2 2 3 

公式是prev(c)-b+a4-2+0=22-4+1=-1

誰能幫我跨越這個障礙?

+0

假設你已經嘗試做一些事情來解決這個問題。請告訴我們你想要做什麼。 – Grigoriy

+0

那麼,你的問題是什麼?你有精確的公式,你可以很容易地搜索如何獲得以前的值和如何總結字段 –

+0

是的我使用下面的方法, 方法創建了新的列c_new,它滯後1並做了c_new-a + b後來分析了值必須從先前生成的c_new值中動態獲取。 下面是代碼 DF = df.withColumn( 'c_new',func.lag(DF [ 'C'])。在(Window.partitionBy( 「A」)。ORDERBY( 「A」))) df = df.withColumn('Stock_New',(df ['c_new'] - stock_output_table ['a'])+ stock_output_table ['b']) 我被卡在這裏不知道如何動態地從c_new –

回答

2
from pyspark.sql.functions import lag, udf 
from pyspark.sql.types import IntegerType 
from pyspark.sql.window import Window 

numbers = [[1,2,3],[2,3,4],[3,4,5],[5,6,7]] 
df = sc.parallelize(numbers).toDF(['a','b','c']) 
df.show() 

w = Window().partitionBy().orderBy('a') 
calculate = udf(lambda a,b,c:a-b+c,IntegerType()) 
df = df.withColumn('result', lag("a").over(w)-df.b+df.c) 
df.show() 



+---+---+---+ 
| a| b| c| 
+---+---+---+ 
| 1| 2| 3| 
| 2| 3| 4| 
| 3| 4| 5| 
| 5| 6| 7| 
+---+---+---+ 

+---+---+---+------+ 
| a| b| c|result| 
+---+---+---+------+ 
| 1| 2| 3| null| 
| 2| 3| 4|  2| 
| 3| 4| 5|  3| 
| 5| 6| 7|  4| 
+---+---+---+------+ 
+0

非常感謝您的回答。但是,如果我的數據幀是這樣的話,這不起作用= [[1,1,2,3],[1,1,2,3],[2,2,3,4],[3,3,4 ,5],[3,3,4,5],[3,3,4,5],[4,5,6,7]] df = sc.parallelize(numbers).toDF(['cat' ,'a','b','c']) w = Window()。partitionBy()。orderBy('cat') df = df.withColumn('result',lag(「a」)。over (w)的-df.b + df.c)。我應該在最後採取任何明確的,請幫助 –

+0

它給了什麼錯誤? – StackPointer

+0

它工作得很好,給我...給定你的數據集 – StackPointer

0

希望這可能有幫助!

import pyspark.sql.functions as f 
from pyspark.sql.window import Window 

df = sc.parallelize([ 
    [1,3], 
    [2,0], 
    [4,1], 
    [2,2] 
]).toDF(('a', 'b')) 

df1 = df.withColumn("row_id", f.monotonically_increasing_id()) 
w = Window.partitionBy().orderBy(f.col("row_id")) 
df1 = df1.withColumn("c_temp", f.when(f.col("row_id")==0, f.lit(4)).otherwise(- f.col("a") + f.col("b"))) 
df1 = df1.withColumn("c", f.sum(f.col("c_temp")).over(w)).drop("c_temp","row_id") 
df1.show() 

輸出是:

+---+---+---+ 
| a| b| c| 
+---+---+---+ 
| 1| 3| 4| 
| 2| 0| 2| 
| 4| 1| -1| 
| 2| 2| -1| 
+---+---+---+