2016-03-09 100 views
2

我有一個PySpark數據幀與由添加列PySpark數據幀根據列值是否爲在另一列

[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items') 

給定結構中我需要與根據是否「項目」 1或0添加另外的柱是否在'fav_items'中。

所以我想

[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)] 

我怎麼會擡頭了第二欄到第三欄來決定價值,我怎麼會再加入呢?

回答

6

以下代碼執行請求的任務。用戶定義的函數被定義爲接收兩列DataFrame作爲參數。因此,對於每一行,搜索一個項目是否在項目列表中。如果發現項目,1是回報,否則爲0。

# Imports 
from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 
# First we create a RDD in order to create a dataFrame: 
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])]) 
df = rdd.toDF(['user', 'item', 'fav_items']) 
# Print dataFrame 
df.show() 

# We make an user define function that receives two columns and do operation 
function = udf(lambda item, items: 1 if item in items else 0, IntegerType()) 

df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show() 

下面的結果:

+----+----+---------+ 
|user|item|fav_items| 
+----+----+---------+ 
| u1| 1|[1, 2, 3]| 
| u1| 4|[1, 2, 3]| 
+----+----+---------+ 

+----+----+---------+------+ 
|user|item|fav_items|result| 
+----+----+---------+------+ 
| u1| 1|[1, 2, 3]|  1| 
| u1| 4|[1, 2, 3]|  0| 
+----+----+---------+------+ 
2

只是爲了好玩非UDF的解決方案:

from pyspark.sql.functions import col, first, explode, max as max_ 

result = (
    # Here we take exploded rows and for each row check if there 
    # is a match. We cast to integer (false -> 0, true -> 1) 
    # and take max (1 if there is any match) 
    max_((col("fav_item") == col("item")).cast("integer")) 
).alias("result") 


(df.repartition("user", "item") 
    # Explode array so we compare item and fav_item 
    .withColumn("fav_item", explode("fav_items")) 
    .groupBy("user", "item") 
    # Aggregate 
    # we add result and retain fav_items 
    .agg(result, first("fav_items").alias("fav_items"))) 

所以只是:

  • 解開fav_array:如果

    ## +----+----+---------+--------+ 
    ## |user|item|fav_items|fav_item| 
    ## +----+----+---------+--------+ 
    ## | u1| 1|[1, 2, 3]|  1| 
    ## | u1| 1|[1, 2, 3]|  2| 
    ## | u1| 1|[1, 2, 3]|  3| 
    ## | u1| 4|[1, 2, 3]|  1| 
    ## | u1| 4|[1, 2, 3]|  2| 
    ## | u1| 4|[1, 2, 3]|  3| 
    ## +----+----+---------+--------+ 
    
  • 檢查fav_item = item_1(col("fav_item") == col("item")).cast("integer")表達的結果):

    ## +----+----+---------+--------+---+ 
    ## |user|item|fav_items|fav_item| _1| 
    ## +----+----+---------+--------+---+ 
    ## | u1| 1|[1, 2, 3]|  1| 1| 
    ## | u1| 1|[1, 2, 3]|  2| 0| 
    ## | u1| 1|[1, 2, 3]|  3| 0| 
    ## | u1| 4|[1, 2, 3]|  1| 0| 
    ## | u1| 4|[1, 2, 3]|  2| 0| 
    ## | u1| 4|[1, 2, 3]|  3| 0| 
    ## +----+----+---------+--------+---+ 
    
  • 和回滾該保持useritem作爲組列,一個任意fav_items(所有是相同的)和最大的臨時列_1(0或1)。

我會用UDF去。

+0

非常聰明,但我不明白它先生零,你能給一個更深的解釋? –

+0

@AlbertoBonsanto當然,我簡化了代碼並添加了一些評論。 – zero323

相關問題