2017-03-14 51 views
1

例如, 來源:如何迭代轉置spark rdd的子矩陣?

+-----+-----+ 
|Date |val_1| 
+-----+-----+ 
| 1-1 | 1.1| 
| 1-2 | 1.2|  
| 1-3 | 1.3| 
| 1-4 | 1.4| 
| 1-5 | 1.5| 
| 1-6 | 1.6| 
| 1-7 | 1.7| 
| 1-8 | 1.8| 
| 1-9 | 1.9| 
| ...| ...| 

要:

+-----+-----+-----+-------+ 
| Date | D-3 | D-2 | D-1 | 
+-----+-----+-----+-------+ 
| 1-4 | 1.1 | 1.2 | 1.3 | 
| 1-5 | 1.2 | 1.3 | 1.4 | 
| 1-6 | 1.3 | 1.4 | 1.5 | 
| 1-7 | 1.4 | 1.5 | 1.6 | 
| 1-8 | 1.5 | 1.6 | 1.7 | 
| 1-9 | 1.6 | 1.7 | 1.8 | 
| ... | ... | ... | ... | 

非常感謝提前。

回答

2

你的問題並不完全清楚,尤其是對於你之後的迭代解決方案。然而,對於例如數據提供:

df = sc.parallelize([('1-1', 1.1), ('1-2', 1.2), ('1-3', 1.3), ('1-4', 1.4), ('1-5', 1.5), ('1-6', 1.6),('1-7', 1.7),('1-8', 1.8),('1-9', 1.9)]).toDF(["Date", "val_1"]) 

您可以結合使用lagWindow檢索D-3D-2D-1

from pyspark.sql.functions import lag, col 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col("Date")) 
dfl = df.select("Date", lag("val_1",count=3).over(w).alias("D-3"), 
        lag("val_1",count=2).over(w).alias("D-2"), 
        lag("val_1",count=1).over(w).alias("D-1")).na.drop() 
dfl.show()      

這將導致以下的輸出:

+----+---+---+---+ 
|Date|D-3|D-2|D-1| 
+----+---+---+---+ 
| 1-4|1.1|1.2|1.3| 
| 1-5|1.2|1.3|1.4| 
| 1-6|1.3|1.4|1.5| 
| 1-7|1.4|1.5|1.6| 
| 1-8|1.5|1.6|1.7| 
| 1-9|1.6|1.7|1.8| 
+----+---+---+---+ 
+0

雅科你好,非常感謝你的答案,它是真正有用的,我試着在「地圖」功能使用拉姆達但它的失敗,在這種情況下,似乎「窗口」和「滯後」功能更適合。 –

1

感謝雅科的靈感。 這裏是斯卡拉版本:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions 
val df = sc.parallelize(Seq(("1-1", 1.1), ("1-2", 1.2), ("1-3", 1.3), ("1-4", 1.4), ("1-5", 1.5), ("1-6", 1.6),("1-7", 1.7),("1-8", 1.8),("1-9", 1.9))).toDF("Date", "val_1") 
val w = Window.partitionBy().orderBy("Date") 
val res = df.withColumn("D-3", lag("val_1", 3, 0).over(w)).withColumn("D-2", lag("val_1", 2, 0).over(w)).withColumn("D-1", lag("val_1", 1, 0).over(w)).na.drop() 

結果:

+----+-----+---+---+---+ 
|Date|val_1|D-3|D-2|D-1| 
+----+-----+---+---+---+ 
| 1-4| 1.4|1.1|1.2|1.3| 
| 1-5| 1.5|1.2|1.3|1.4| 
| 1-6| 1.6|1.3|1.4|1.5| 
| 1-7| 1.7|1.4|1.5|1.6| 
| 1-8| 1.8|1.5|1.6|1.7| 
| 1-9| 1.9|1.6|1.7|1.8| 
+----+-----+---+---+---+ 
+0

不確定的性能,我試圖使用「df.select」,但它不支持使用「滯後」功能。 –