2016-09-14 56 views
2

以下Spark代碼正確地演示了我想要執行的操作,並使用小型演示數據集生成正確的輸出。Spark合併/組合數組inBy/aggregate

當我在大量生產數據上運行相同的一般類型的代碼時,我遇到了運行時問題。 Spark作業在我的羣集上運行約12小時並失敗。

只是看了一下下面的代碼,看起來沒有效率的爆炸每一行,只是將它合併回來。在給定的測試數據集中,第四行包含array_value_1中的三個值和array_value_2中的三個值,它們將爆炸爲3 * 3或9個分解行。

因此,在一個更大的數據集中,一個包含5個這樣的數組列的行,以及每列中的10個值會爆炸到10^5個爆炸行?

查看提供的Spark函數,沒有開箱即用的功能可以做我想做的事情。我可以提供用戶定義的功能。有沒有速度缺點呢?

val sparkSession = SparkSession.builder. 
    master("local") 
    .appName("merge list test") 
    .getOrCreate() 

val schema = StructType(
    StructField("category", IntegerType) :: 
    StructField("array_value_1", ArrayType(StringType)) :: 
    StructField("array_value_2", ArrayType(StringType)) :: 
    Nil) 

val rows = List(
    Row(1, List("a", "b"), List("u", "v")), 
    Row(1, List("b", "c"), List("v", "w")), 
    Row(2, List("c", "d"), List("w")), 
    Row(2, List("c", "d", "e"), List("x", "y", "z")) 
) 

val df = sparkSession.createDataFrame(rows.asJava, schema) 

val dfExploded = df. 
    withColumn("scalar_1", explode(col("array_value_1"))). 
    withColumn("scalar_2", explode(col("array_value_2"))) 

// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19 
logger.info(s"dfExploded.count()=${dfExploded.count()}") 

val dfOutput = dfExploded.groupBy("category").agg(
    collect_set("scalar_1").alias("combined_values_2"), 
    collect_set("scalar_2").alias("combined_values_2")) 

dfOutput.show() 

回答

6

這可能是低效的explode但從根本上試圖實現的操作僅僅是昂貴的。實際上,它只是另一個groupByKey,你可以在這裏做的不多,讓它變得更好。由於您使用的Spark> 2.0,你可以直接collect_list和扁平:

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten.distinct) 

df 
    .groupBy("category") 
    .agg(
    flatten(collect_list("array_value_1")), 
    flatten(collect_list("array_value_2")) 
) 

也可以使用custom Aggregator但我懷疑這些都將產生巨大的變化。

如果集是比較大的,你希望重複的顯著數量,你可以嘗試使用aggregateByKey與可變集:

import scala.collection.mutable.{Set => MSet} 

val rdd = df 
    .select($"category", struct($"array_value_1", $"array_value_2")) 
    .as[(Int, (Seq[String], Seq[String]))] 
    .rdd 

val agg = rdd 
    .aggregateByKey((MSet[String](), MSet[String]()))( 
    {case ((accX, accY), (xs, ys)) => (accX ++= xs, accY ++ ys)}, 
    {case ((accX1, accY1), (accX2, accY2)) => (accX1 ++= accX2, accY1 ++ accY2)} 
) 
    .mapValues { case (xs, ys) => (xs.toArray, ys.toArray) } 
    .toDF 
+1

一個簡單的扁平化UDF的第一個解決方案完全解決了該問題。 Spark從約12小時開始,直到在30分鐘內成功完成整個工作。觀看Spark監視器GUI,每個內部任務都會在一分鐘或更短時間內運行並完成。感謝您的幫助。 – clay

+0

我很高興聽到,雖然我不得不承認我很驚訝。我希望有一點小小的改進,但沒有那麼令人印象深刻個人名單有多大? – zero323