2017-04-12 74 views
0

這是星火2.1,鑑於此輸入文件:如何在Spark數據框中嵌套數組中的結構值?

order.json

{"id":1,"price":202.30,"userid":1} 
{"id":2,"price":343.99,"userid":1} 
{"id":3,"price":399.99,"userid":2} 

而下面dataframes:

val order = sqlContext.read.json("order.json") 
val df2 = order.select(struct("*") as 'order) 
val df3 = df2.groupBy("order.userId").agg(collect_list($"order").as("array")) 

DF3有以下內容:

+------+---------------------------+ 
|userId|array      | 
+------+---------------------------+ 
|1  |[[1,202.3,1], [2,343.99,1]]| 
|2  |[[3,399.99,2]]    | 
+------+---------------------------+ 

和結構:

root 
|-- userId: long (nullable = true) 
|-- array: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id: long (nullable = true) 
| | |-- price: double (nullable = true) 
| | |-- userid: long (nullable = true) 

現在假設我給出DF3:

  1. 我想計算array.price的總和爲每個用戶id,同時具有每用戶id陣列的優點行。

  2. 我會將此計算添加到生成的數據框的新列中。就像我做了df3.withColumn(「sum」,點亮(0)),但點亮(0)代替我的計算。

它會假設是直截了當的,但我堅持兩個。我沒有找到任何方法來訪問整個數組每行的計算(例如foldLeft)。

回答

1

我想計算array.price的總和爲每個用戶id,同時具有陣列

的優勢,遺憾的是具有陣列工作在這裏對你。 Spark SQL和DataFrame都不提供可直接用於處理任意大小數組上的任務而不首先分解(explode)的工具。

您可以使用UDF:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{col, udf} 

val totalPrice = udf((xs: Seq[Row]) => xs.map(_.getAs[Double]("price")).sum) 
df3.withColumn("totalPrice", totalPrice($"array")) 
+------+--------------------+----------+ 
|userId|    array|totalPrice| 
+------+--------------------+----------+ 
|  1|[[1,202.3,1], [2,...| 546.29| 
|  2|  [[3,399.99,2]]| 399.99| 
+------+--------------------+----------+ 

或轉換爲靜態類型Dataset

df3 
    .as[(Long, Seq[(Long, Double, Long)])] 
    .map{ case (id, xs) => (id, xs, xs.map(_._2).sum) } 
    .toDF("userId", "array", "totalPrice").show 
+------+--------------------+----------+ 
|userId|    array|totalPrice| 
+------+--------------------+----------+ 
|  1|[[1,202.3,1], [2,...| 546.29| 
|  2|  [[3,399.99,2]]| 399.99| 
+------+--------------------+----------+ 

正如上面提到你分解和聚合:

import org.apache.spark.sql.functions.{sum, first} 

df3 
    .withColumn("price", explode($"array.price")) 
    .groupBy($"userId") 
    .agg(sum($"price"), df3.columns.tail.map(c => first(c).alias(c)): _*) 
+------+----------+--------------------+ 
|userId|sum(price)|    array| 
+------+----------+--------------------+ 
|  1| 546.29|[[1,202.3,1], [2,...| 
|  2| 399.99|  [[3,399.99,2]]| 
+------+----------+--------------------+ 

但它是昂貴的,不使用現有的結構。

有一個醜陋的技巧,你可以使用:

import org.apache.spark.sql.functions.{coalesce, lit, max, size} 

val totalPrice = (0 to df3.agg(max(size($"array"))).as[Int].first) 
    .map(i => coalesce($"array.price".getItem(i), lit(0.0))) 
    .foldLeft(lit(0.0))(_ + _) 

df3.withColumn("totalPrice", totalPrice) 
+------+--------------------+----------+ 
|userId|    array|totalPrice| 
+------+--------------------+----------+ 
|  1|[[1,202.3,1], [2,...| 546.29| 
|  2|  [[3,399.99,2]]| 399.99| 
+------+--------------------+----------+ 

,但它比真正的解決方案更多的是好奇。

相關問題