2017-04-18 23 views
6

我有兩個數據幀A和B.A很大(100G),而B相對較小(100M)。 A的分區號爲8,B的分區號爲1.如何提高Spark中的廣播加入速度

A.join(broadcast(B), $"cur" >= $"low" && $"cur" <= $"high", "left_outer") 

速度很慢(> 10小時)。

但是,如果我改變連接條件:

A.join(broadcast(B), $"cur" === $"low" , "left_outer") 

它變得非常快(< 30分鐘)。但條件不能改變。

那麼有什麼方法可以進一步提高我原始連接條件下的連接速度嗎?

+0

他們是「長」型 – derek

回答

7

訣竅是重寫join條件,因此它包含=組件,可用於優化查詢並縮小可能的匹配。對於數字值,您可以將數據轉換爲數據,並使用存儲區來處理連接條件。

比方說,你的數據是這樣的:

val a = spark.range(100000) 
    .withColumn("cur", (rand(1) * 1000).cast("bigint")) 

val b = spark.range(100) 
    .withColumn("low", (rand(42) * 1000).cast("bigint")) 
    .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint")) 

首先選擇適合您的數據的桶大小。在這種情況下,我們可以使用50:

val bucketSize = 50L 

分配桶每一行從a

val aBucketed = a.withColumn(
    "bucket", ($"cur"/bucketSize).cast("bigint") * bucketSize 
) 

創建UDF,這將發出桶的範圍:

def get_buckets(bucketSize: Long) = 
    udf((low: Long, high: Long) => { 
    val min = (low/bucketSize) * bucketSize 
    val max = (high/bucketSize) * bucketSize 
    (min to max by bucketSize).toSeq 
    }) 

和剷鬥b

val bBucketed = b.withColumn(
    "bucket", explode(get_buckets(bucketSize)($"low", $"high")) 
) 

使用中的羣組中join條件:

aBucketed.join(
    broadcast(bBucketed), 
    aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" && 
    $"cur" <= $"high", 
    "leftouter" 
) 

這樣的Spark將使用BroadcastHashJoin

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L)) 
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double)/50.0) as bigint) * 50) AS bucket#184L] 
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L] 
:  +- *Range (0, 100000, step=1, splits=Some(8)) 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false])) 
    +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L] 
     +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L] 
     +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L] 
      +- *Range (0, 100, step=1, splits=Some(8)) 

,而不是BroadcastNestedLoopJoin

== Physical Plan == 
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L)) 
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L] 
: +- *Range (0, 100000, step=1, splits=Some(8)) 
+- BroadcastExchange IdentityBroadcastMode 
    +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L] 
     +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L] 
     +- *Range (0, 100, step=1, splits=Some(8)) 

您可以調整桶尺寸精度和之間的平衡數據大小。

如果你不介意的較低級的解決方案,然後broadcast恆定項訪問的排序序列(如ArrayVector),並使用udf二進制搜索的加盟。

你還應該看看分區的數量。 100GB的8個分區似乎很低。