2017-10-16 31 views
0

我有一個數據幀中pyspark:添加列到數據幀和更新pyspark

ratings = spark.createDataFrame(
    sc.textFile("transactions.json").map(lambda l: json.loads(l)), 
) 
ratings.show() 

+--------+-------------------+------------+----------+-------------+-------+ 
|click_id|   created_at|   ip|product_id|product_price|user_id| 
+--------+-------------------+------------+----------+-------------+-------+ 
|  123|2016-10-03 12:50:33| 10.10.10.10|  98373|  220.5|  1| 
|  124|2017-02-03 11:51:33| 10.13.10.10|  97373|  320.5|  1| 
|  125|2017-10-03 12:52:33| 192.168.2.1|  96373|   20.5|  1| 
|  126|2017-10-03 13:50:33|172.16.11.10|  88373|  220.5|  2| 
|  127|2017-10-03 13:51:33| 10.12.15.15|  87373|  320.5|  2| 
|  128|2017-10-03 13:52:33|192.168.1.10|  86373|   20.5|  2| 
|  129|2017-08-03 14:50:33| 10.13.10.10|  78373|  220.5|  3| 
|  130|2017-10-03 14:51:33| 12.168.1.60|  77373|  320.5|  3| 
|  131|2017-10-03 14:52:33| 10.10.30.30|  76373|   20.5|  3| 
+--------+-------------------+------------+----------+-------------+-------+ 

ratings.registerTempTable("transactions") 
final_df = sqlContext.sql("select * from transactions"); 

我想一個新列添加到一個名爲status這個數據幀,然後更新基於created_atuser_id狀態欄。

created_atuser_id從給定表transations讀取並傳遞到功能get_status(user_id,created_at)它返回status。這status需要被放入交易表作爲相應的新列user_idcreated_at

我可以在pyspark運行更改和更新命令嗎? 這怎麼可以使用pyspark完成?

回答

0

目前還不清楚你想要做什麼。你應該檢查出window functions他們允許你比較,總結...框架中的行。

例如

import pyspark.sql.functions as psf 
from pyspark.sql import Window 
w = Window.partitionBy("user_id").orderBy(psf.desc("created_at")) 
ratings.withColumn(
    "status", 
    psf.when(psf.row_number().over(w) == 1, "active").otherwise("inactive")).sort("click_id").show() 

+--------+-------------------+------------+----------+-------------+-------+--------+ 
|click_id|   created_at|   ip|product_id|product_price|user_id| status| 
+--------+-------------------+------------+----------+-------------+-------+--------+ 
|  123|2016-10-03 12:50:33| 10.10.10.10|  98373|  220.5|  1|inactive| 
|  124|2017-02-03 11:51:33| 10.13.10.10|  97373|  320.5|  1|inactive| 
|  125|2017-10-03 12:52:33| 192.168.2.1|  96373|   20.5|  1| active| 
|  126|2017-10-03 13:50:33|172.16.11.10|  88373|  220.5|  2|inactive| 
|  127|2017-10-03 13:51:33| 10.12.15.15|  87373|  320.5|  2|inactive| 
|  128|2017-10-03 13:52:33|192.168.1.10|  86373|   20.5|  2| active| 
|  129|2017-08-03 14:50:33| 10.13.10.10|  78373|  220.5|  3|inactive| 
|  130|2017-10-03 14:51:33| 12.168.1.60|  77373|  320.5|  3|inactive| 
|  131|2017-10-03 14:52:33| 10.10.30.30|  76373|   20.5|  3| active| 
+--------+-------------------+------------+----------+-------------+-------+--------+ 

如果你想傳遞一個UDF從兩個現有的創建新列它給你的每一個用戶的最後一次點擊

。 假設你有一個函數,它的user_idcreated_at作爲參數

from pyspark.sql.types import * 
def get_status(user_id,created_at): 
    ... 

get_status_udf = psf.udf(get_status, StringType()) 

StringType()或任何數據類型的函數輸出

ratings.withColumn("status", get_status_udf("user_id", "created_at")) 
+0

的'created_at'和'user_id'從給定的表中讀取'transations '並傳遞給函數'get_status(user_id,created_at)'返回'status'。這個'status'需要作爲相應'user_id'和'created_at'的新列被放入事務表中 – Firstname