訣竅是重寫join
條件,因此它包含=
組件,可用於優化查詢並縮小可能的匹配。對於數字值,您可以將數據轉換爲數據,並使用存儲區來處理連接條件。
比方說,你的數據是這樣的:
val a = spark.range(100000)
.withColumn("cur", (rand(1) * 1000).cast("bigint"))
val b = spark.range(100)
.withColumn("low", (rand(42) * 1000).cast("bigint"))
.withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))
首先選擇適合您的數據的桶大小。在這種情況下,我們可以使用50:
val bucketSize = 50L
分配桶每一行從a
:
val aBucketed = a.withColumn(
"bucket", ($"cur"/bucketSize).cast("bigint") * bucketSize
)
創建UDF,這將發出桶的範圍:
def get_buckets(bucketSize: Long) =
udf((low: Long, high: Long) => {
val min = (low/bucketSize) * bucketSize
val max = (high/bucketSize) * bucketSize
(min to max by bucketSize).toSeq
})
和剷鬥b
:
val bBucketed = b.withColumn(
"bucket", explode(get_buckets(bucketSize)($"low", $"high"))
)
使用中的羣組中join
條件:
aBucketed.join(
broadcast(bBucketed),
aBucketed("bucket") === bBucketed("bucket") &&
$"cur" >= $"low" &&
$"cur" <= $"high",
"leftouter"
)
這樣的Spark將使用BroadcastHashJoin
:
*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double)/50.0) as bigint) * 50) AS bucket#184L]
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
+- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
,而不是BroadcastNestedLoopJoin
:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
您可以調整桶尺寸精度和之間的平衡數據大小。
如果你不介意的較低級的解決方案,然後broadcast
恆定項訪問的排序序列(如Array
或Vector
),並使用udf
二進制搜索的加盟。
你還應該看看分區的數量。 100GB的8個分區似乎很低。
他們是「長」型 – derek