2017-03-09 50 views
2

我建立與pyspark一個數據幀,像這樣從1值到n:添加一列包括在數據幀

+----+------+ 
| k|  v| 
+----+------+ 
|key1|value1| 
|key1|value1| 
|key1|value1| 
|key2|value1| 
|key2|value1| 
|key2|value1| 
+----+------+ 

我要添加一個「的rowNum」列使用「withColumn」方法,結果數據幀如下更改:

+----+------+------+ 
| k|  v|rowNum| 
+----+------+------+ 
|key1|value1|  1| 
|key1|value1|  2| 
|key1|value1|  3| 
|key2|value1|  4| 
|key2|value1|  5| 
|key2|value1|  6| 
+----+------+------+ 

rowNum的範圍從1到n,n等於原始數。我修改了代碼,像這樣:

from pyspark.sql.window import Window 
from pyspark.sql import functions as F 
w = Window().partitionBy("v").orderBy('k') 
my_df= my_df.withColumn("rowNum", F.rowNumber().over(w)) 

但是,我得到錯誤信息:

'module' object has no attribute 'rowNumber' 

我換成ROWNUMBER()方法ROW_NUMBER,上面的代碼可以運行。但是,當我運行代碼:

my_df.show() 

我再次得到了錯誤信息:

Py4JJavaError: An error occurred while calling o898.showString. 
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number() 
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224) 
    at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:342) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101) 
    at scala.Option.getOrElse(Option.scala:121) 
+0

這是最有可能的[這個](http://stackoverflow.com/questions/32086578/how-to-add-row-id-in-pyspark-dataframes)的誘惑。 –

回答

0

如果您需要需要從1連續rowNum值N,而不是monotonically_increasing_id可以使用zipWithIndex()

重新創建您的示例數據,如下所示:

rdd = sc.parallelize([('key1','value1'), 
         ('key1','value1'), 
         ('key1','value1'), 
         ('key1','value1'), 
         ('key1','value1'), 
         ('key1','value1')]) 

然後,您可以使用zipWithIndex()向每一行添加索引。該map用於格式化數據,所以它開始於1

rdd_indexed = rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1)) 
df = rdd_indexed.toDF(['id','score','rowNum']) 
df.show() 


+----+------+------+ 
| id| score|rowNum| 
+----+------+------+ 
|key1|value1|  1| 
|key1|value1|  2| 
|key1|value1|  3| 
|key1|value1|  4| 
|key1|value1|  5| 
|key1|value1|  6| 
+----+------+------+ 
+0

我需要基於當前的數據框來添加這個新列。所以,我希望使用數據框的withColumn方法。 –

+0

'rdd'可以通過'df.rdd'訪問,允許您使用相同的概念。請注意,我建議使用的是將'monotonically_increasing_id'與'withColumn'結合使用,儘管這種方法不能保證順序ID。 – Jaco

1

爲此,您可以用windows

from pyspark.sql.window import Window 
from pyspark.sql.functions import rowNumber 
w = Window().orderBy() 
your_df= your_df.withColumn("rowNum", rowNumber().over(w)) 

這裏your_df 1添加到索引的數據幀,其中你需要這個專欄。

+1

我用你的代碼在我的程序中嘗試。我發現了一些問題:'模塊'對象沒有'rowNumber'屬性。所以,我發現了另一種方法row_number。 row_number可以運行。但是,當我運行代碼:your_df.show()。我收到錯誤消息。像這樣:引起:java.lang.UnsupportedOperationException:無法評估表達式:row_number() –

+0

您是否導入了rowNumber –

+0

而這些語句的準確率是100%,正如我在我的產品代碼中使用的那樣 –

相關問題