2017-02-13 148 views
2

我有以下數據框:如何添加運行標識新列星火數據幀(pyspark)

timestamp \t  sum 
 
31/01/2017 09:00 \t 0 
 
31/01/2017 10:00 \t 0 
 
31/01/2017 11:00 \t 0 
 
31/01/2017 12:00 \t 2 
 
31/01/2017 13:00 \t 2 
 
31/01/2017 14:00 \t 2 
 
31/01/2017 15:00 \t 11

,並想添加一個新的ID列 - 只是一個流水號像即:

+----------------+---+---------+ 
 
|  timestamp|sum|running_id| 
 
+----------------+---+---------+ 
 
|2017-01-31 09:00| 0|  0| 
 
|2017-01-31 10:00| 0|  1| 
 
|2017-01-31 11:00| 0|  2| 
 
|2017-01-31 12:00| 2|  3| 
 
|2017-01-31 13:00| 2|  4| 
 
|2017-01-31 14:00| 2|  5| 
 
|2017-01-31 15:00| 11|  6|

我做了這樣的:

sub_data_spark = sub_data_spark.rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1])).toDF(sub_data_spark.columns+["running_id"])

有些人可以爲一個 「乾淨」 的方式諮詢?

感謝, 鮑里斯

回答

2

嘗試。也可以使用PARTITION BY子句。

+0

如果沒有添加PartitionBy子句,這將基本上將所有數據混洗到單個分區,這不會對並行性有好處。 –

+0

謝謝,這個工程。如何使用PartitionBy並保持時間戳的有序性(行的順序應該保持不變) – Boris

+0

從表中選擇*,row_Number()over(按TO_DATE分區(timestamp)順序)。這會將不同日期的所有數據發送到不同的分區。但是對於所有分區計數將從1開始。這種方法有其優點和缺點。根據你的用例使用它。 –

2

到的唯一方法沒有zipWithIndex或zipWithUniqueId你應該使用功能monotonically_increasing_id

此功能的工作原理是這樣的:

產生單調遞增64列位整數。

生成的ID保證是單調遞增的,並且 是唯一的,但不是連續的。當前的實施將 分區ID放在高31位中,並將每個 分區內的記錄號放在低33位中。假設數據幀 的分區少於10億個,並且每個分區的記錄少於8個 。

因此,對於你的情況,你可以使用這樣的:

sub_data_spark.withColumn('Id', monotonically_increasing_id()).show() 

這將返回給您的唯一ID爲您的模型。但它不會在0開始,並使用select *, row_Number() over (order by sum) from table

或基於邏輯的任何列將不連續

+0

謝謝,我確實尋找一個單調增加和唯一的數字=>行號 – Boris

+0

它確實有效,但如果您想要32位整數而不是64位,情況如何?這裏的截斷會導致問題。 –

+0

根據Spark代碼,您將使用64位長的:https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst /expressions/MonotonicallyIncreasingID.scala#L48 –