0
我有一個數據幀中pyspark:添加列到數據幀和更新pyspark
ratings = spark.createDataFrame(
sc.textFile("transactions.json").map(lambda l: json.loads(l)),
)
ratings.show()
+--------+-------------------+------------+----------+-------------+-------+
|click_id| created_at| ip|product_id|product_price|user_id|
+--------+-------------------+------------+----------+-------------+-------+
| 123|2016-10-03 12:50:33| 10.10.10.10| 98373| 220.5| 1|
| 124|2017-02-03 11:51:33| 10.13.10.10| 97373| 320.5| 1|
| 125|2017-10-03 12:52:33| 192.168.2.1| 96373| 20.5| 1|
| 126|2017-10-03 13:50:33|172.16.11.10| 88373| 220.5| 2|
| 127|2017-10-03 13:51:33| 10.12.15.15| 87373| 320.5| 2|
| 128|2017-10-03 13:52:33|192.168.1.10| 86373| 20.5| 2|
| 129|2017-08-03 14:50:33| 10.13.10.10| 78373| 220.5| 3|
| 130|2017-10-03 14:51:33| 12.168.1.60| 77373| 320.5| 3|
| 131|2017-10-03 14:52:33| 10.10.30.30| 76373| 20.5| 3|
+--------+-------------------+------------+----------+-------------+-------+
ratings.registerTempTable("transactions")
final_df = sqlContext.sql("select * from transactions");
我想一個新列添加到一個名爲status
這個數據幀,然後更新基於created_at
和user_id
狀態欄。
的created_at
和user_id
從給定表transations
讀取並傳遞到功能get_status(user_id,created_at)
它返回status
。這status
需要被放入交易表作爲相應的新列user_id
和created_at
我可以在pyspark運行更改和更新命令嗎? 這怎麼可以使用pyspark完成?
的'created_at'和'user_id'從給定的表中讀取'transations '並傳遞給函數'get_status(user_id,created_at)'返回'status'。這個'status'需要作爲相應'user_id'和'created_at'的新列被放入事務表中 – Firstname