以下Spark代碼正確地演示了我想要執行的操作,並使用小型演示數據集生成正確的輸出。Spark合併/組合數組inBy/aggregate
當我在大量生產數據上運行相同的一般類型的代碼時,我遇到了運行時問題。 Spark作業在我的羣集上運行約12小時並失敗。
只是看了一下下面的代碼,看起來沒有效率的爆炸每一行,只是將它合併回來。在給定的測試數據集中,第四行包含array_value_1中的三個值和array_value_2中的三個值,它們將爆炸爲3 * 3或9個分解行。
因此,在一個更大的數據集中,一個包含5個這樣的數組列的行,以及每列中的10個值會爆炸到10^5個爆炸行?
查看提供的Spark函數,沒有開箱即用的功能可以做我想做的事情。我可以提供用戶定義的功能。有沒有速度缺點呢?
val sparkSession = SparkSession.builder.
master("local")
.appName("merge list test")
.getOrCreate()
val schema = StructType(
StructField("category", IntegerType) ::
StructField("array_value_1", ArrayType(StringType)) ::
StructField("array_value_2", ArrayType(StringType)) ::
Nil)
val rows = List(
Row(1, List("a", "b"), List("u", "v")),
Row(1, List("b", "c"), List("v", "w")),
Row(2, List("c", "d"), List("w")),
Row(2, List("c", "d", "e"), List("x", "y", "z"))
)
val df = sparkSession.createDataFrame(rows.asJava, schema)
val dfExploded = df.
withColumn("scalar_1", explode(col("array_value_1"))).
withColumn("scalar_2", explode(col("array_value_2")))
// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19
logger.info(s"dfExploded.count()=${dfExploded.count()}")
val dfOutput = dfExploded.groupBy("category").agg(
collect_set("scalar_1").alias("combined_values_2"),
collect_set("scalar_2").alias("combined_values_2"))
dfOutput.show()
一個簡單的扁平化UDF的第一個解決方案完全解決了該問題。 Spark從約12小時開始,直到在30分鐘內成功完成整個工作。觀看Spark監視器GUI,每個內部任務都會在一分鐘或更短時間內運行並完成。感謝您的幫助。 – clay
我很高興聽到,雖然我不得不承認我很驚訝。我希望有一點小小的改進,但沒有那麼令人印象深刻個人名單有多大? – zero323