1
我是PySpark的新品牌,我試圖轉換一些派生新變量'COUNT_IDX'的python代碼。新變量的初始值爲1,但在條件滿足時會增加1。否則,新變量值將與最後一條記錄中的值相同。PySpark條件增量
條件遞增是當: TRIP_CD不等於先前記錄TRIP_CD 或 SIGN不等於先前記錄SIGN 或 time_diff不等於1
Python代碼(熊貓數據幀):
df['COUNT_IDX'] = 1
for i in range(1, len(df)):
if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1])
or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1])
or df['time_diff'].iloc[i] != 1):
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1
else:
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1]
這是預期的結果:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 3
2711 - 1 3
2854 - 1 4
2854 + 1 5
在PySpark,我初始化COUNT_IDX爲1。然後使用Window功能,我把TRIP_CD和SIGN的滯後和計算的time_diff,然後嘗試:
df = sqlContext.sql('''
select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff,
case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag or seconds_diff != 1
then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1
else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))
end as COUNT_INDEX from df''')
這是給我喜歡的東西:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 2
2711 - 1 1
2854 - 1 2
2854 + 1 2
如果在先前記錄上更新COUNT_IDX,則當前記錄上的COUNT_IDX不會識別要計算的更改。這就像COUNTI_IDX沒有被覆蓋,或者它不是按行進行評估。有關我如何解決此問題的任何想法?
這是一個創造性的解決方案,但是,我還沒有完全得到它的工作還沒有。你把這個在withColumn語句中創建一個新的累積總和列或這應該是SQL?謝謝! – Amber
這是爲了替代'case when'和'end'之間的SQL查詢。如果您願意,可以內聯窗口定義。由於數據中有一些缺失的列,所以顯示它只是僞代碼。 – zero323