2017-01-05 63 views
7

(Python)示例將使我的問題清晰明瞭。比方說,我有誰看了在某些日期某些電影人的Spark數據框,如下:來自較早行的累積數組(PySpark數據框)

movierecord = spark.createDataFrame([("Alice", 1, ["Avatar"]),("Bob", 2, ["Fargo", "Tron"]),("Alice", 4, ["Babe"]), ("Alice", 6, ["Avatar", "Airplane"]), ("Alice", 7, ["Pulp Fiction"]), ("Bob", 9, ["Star Wars"])],["name","unixdate","movies"]) 

架構,並通過上述的外觀所定義的數據幀如下:

root 
|-- name: string (nullable = true) 
|-- unixdate: long (nullable = true) 
|-- movies: array (nullable = true) 
| |-- element: string (containsNull = true) 

+-----+--------+------------------+ 
|name |unixdate|movies   | 
+-----+--------+------------------+ 
|Alice|1  |[Avatar]   | 
|Bob |2  |[Fargo, Tron]  | 
|Alice|4  |[Babe]   | 
|Alice|6  |[Avatar, Airplane]| 
|Alice|7  |[Pulp Fiction] | 
|Bob |9  |[Star Wars]  | 
+-----+--------+------------------+ 

我喜歡從上面生成一個新的數據幀列,其中包含全部以前的由每個用戶看到的電影,沒有重複(每個unixdate字段爲「上一個」)。所以它應該看起來像這樣:

+-----+--------+------------------+------------------------+ 
|name |unixdate|movies   |previous_movies   | 
+-----+--------+------------------+------------------------+ 
|Alice|1  |[Avatar]   |[]      | 
|Bob |2  |[Fargo, Tron]  |[]      | 
|Alice|4  |[Babe]   |[Avatar]    | 
|Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
|Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
|Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
+-----+--------+------------------+------------------------+ 

我該如何以高效率的方式實現這一點?

回答

5

只有不保留順序對象 SQL:

  • 需要進口:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    
  • 窗口定義:

    w = Window.partitionBy("name").orderBy("unixdate") 
    
  • 完整的解決方案:

    (movierecord 
        # Flatten movies 
        .withColumn("previous_movie", f.explode("movies")) 
        # Collect unique 
        .withColumn("previous_movies", f.collect_set("previous_movie").over(w)) 
        # Drop duplicates for a single unixdate 
        .groupBy("name", "unixdate") 
        .agg(f.max(f.struct(
         f.size("previous_movies"), 
         f.col("movies").alias("movies"), 
         f.col("previous_movies").alias("previous_movies") 
        )).alias("tmp")) 
        # Shift by one and extract 
        .select(
         "name", "unixdate", "tmp.movies", 
         f.lag("tmp.previous_movies", 1).over(w).alias("previous_movies"))) 
    
  • 結果:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    |Alice|1  |[Avatar]   |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Babe, Avatar]   | 
    |Alice|7  |[Pulp Fiction] |[Babe, Airplane, Avatar]| 
    +-----+--------+------------------+------------------------+ 
    

SQL的Python的UDF維護秩序:

  • 進口:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    from pyspark.sql import Column 
    from pyspark.sql.types import ArrayType, StringType 
    
    from typing import List, Union 
    
    # https://github.com/pytoolz/toolz 
    from toolz import unique, concat, compose 
    
  • UDF:作爲前

    def flatten_distinct(col: Union[Column, str]) -> Column: 
        def flatten_distinct_(xss: Union[List[List[str]], None]) -> List[str]: 
         return compose(list, unique, concat)(xss or []) 
        return f.udf(flatten_distinct_, ArrayType(StringType()))(col) 
    
  • 窗口定義。

  • 完整的解決方案:

    (movierecord 
        # Collect lists 
        .withColumn("previous_movies", f.collect_list("movies").over(w)) 
        # Flatten and drop duplicates 
        .withColumn("previous_movies", flatten_distinct("previous_movies")) 
        # Shift by one 
        .withColumn("previous_movies", f.lag("previous_movies", 1).over(w)) 
        # For presentation only 
        .orderBy("unixdate")) 
    
  • 結果:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Alice|1  |[Avatar]   |null     | 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
    |Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    +-----+--------+------------------+------------------------+ 
    

性能

我相信是有解決這個沒有有效的方式給定的約束。不僅要求輸出需要重要的數據重複(數據是二進制編碼以適應鎢的格式,所以你可能會得到壓縮但鬆散的對象標識),而且由於Spark計算模型包括昂貴的分組和排序,許多操作都很昂貴。

這應該沒問題,如果期望previous_movies的大小是有限且小,但一般不可行。

通過爲用戶保留單一的懶惰歷史記錄,數據重複非常容易解決。不是在SQL中可以完成的事情,而是在低級RDD操作中非常容易。

爆炸和collect_模式是昂貴的。如果你的需求是嚴格的,但你想提高性能,你可以使用Scala UDF來代替Python。