0
我有下面的代碼創建如同下面的數據幀:解析數據FRAM添加新的列和更新列pyspark
ratings = spark.createDataFrame(
sc.textFile("myfile.json").map(lambda l: json.loads(l)),
)
ratings.registerTempTable("mytable")
final_df = sqlContext.sql("select * from mytable");
The data frame look something like this
我存儲的created_at
和user_id
成清單:
user_id_list = final_df.select('user_id').rdd.flatMap(lambda x: x).collect()
created_at_list = final_df.select('created_at').rdd.flatMap(lambda x: x).collect()
,並通過列表中的一個解析調用另一個功能:
for i in range(len(user_id_list)):
status=get_status(user_id_list[I],created_at_list[I])
我想在我的稱爲狀態數據幀創建一個新的列,並更新相應的user_id_list
和created_at_list value
我知道我需要使用此功能的價值 - 但不知道如何着手
final_df.withColumn('status', 'give the condition here')