什麼是錯,此代碼試圖改變一個datetime列的一天pyspark變化一天datetime列
import pyspark
import pyspark.sql.functions as sf
import pyspark.sql.types as sparktypes
import datetime
sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)
rdd = sc.parallelize([('a',datetime.datetime(2014, 1, 9, 0, 0)),
('b',datetime.datetime(2014, 1, 27, 0, 0)),
('c',datetime.datetime(2014, 1, 31, 0, 0))])
testdf = sqlcontext.createDataFrame(rdd, ["id", "date"])
print(testdf.show())
print(testdf.printSchema())
給出了測試數據框:
+---+--------------------+
| id| date|
+---+--------------------+
| a|2014-01-09 00:00:...|
| b|2014-01-27 00:00:...|
| c|2014-01-31 00:00:...|
+---+--------------------+
root
|-- id: string (nullable = true)
|-- date: timestamp (nullable = true)
然後我定義一個UDF改變天日期欄:
def change_day_(date, day):
return date.replace(day=day)
change_day = sf.udf(change_day_, sparktypes.TimestampType())
testdf.withColumn("PaidMonth", change_day(testdf.date, 1)).show(1)
這就提出了一個錯誤:
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
也許嘗試用'lit(1)'替換'1'(在調用'change_day')後,'從pyspark.sql.functions導入點亮'後執行'? –
謝謝!這工作! – muon
@ArthurTacca你能解釋一下爲什麼? – muon