-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check child join type #46263
Changes from 8 commits
d0b4cc5
b9e5f75
de85f8d
97132b7
3964de4
12518be
f8da323
adfdd3d
6e8ce1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,7 +120,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J | |
hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition), | ||
currentPlan, | ||
targetKey) | ||
case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) => | ||
case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) => | ||
// Runtime filters use one side of the [[Join]] to build a set of join key values and prune | ||
// the other side of the [[Join]]. It's also OK to use a superset of the join key values | ||
// (ignore null values) to do the pruning. | ||
|
@@ -129,24 +129,43 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J | |
if (left.output.exists(_.semanticEquals(targetKey))) { | ||
extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's clarify the expected strategy a bit more: For the exact join key match, like the left table here, it's always OK to generate the runtime filter using this left table, no matter what the join type is. This is because left table always produce a superset of output of the join output regarding the left keys. For transitive join key match, it's different. The right table here does not always generate a superset output regarding left keys. Let's look at an example
So we can't use right table to generate runtime filter. |
||
currentPlan = left, targetKey = targetKey).orElse { | ||
// We can also extract from the right side if the join keys are transitive. | ||
AngersZhuuuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2) | ||
.flatMap { newTargetKey => | ||
extract(right, AttributeSet.empty, | ||
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right, | ||
targetKey = newTargetKey) | ||
} | ||
// For the exact join key match, like the left table here, it's always OK to generate | ||
// the runtime filter using this left table, no matter what the join type is. | ||
// This is because left table always produce a superset of output of the join output | ||
// regarding the left keys. | ||
// For transitive join key match, it's different. The right table here does | ||
// not always generate a superset output regarding left keys. | ||
// Let's look at an example | ||
// left table: 1, 2, 3 | ||
// right table, 3, 4 | ||
// left outer join output: (1, null), (2, null), (3, 3) | ||
// left keys: 1, 2, 3 | ||
// So we can't use right table to generate runtime filter. | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (canPruneLeft(joinType)) { | ||
lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2) | ||
.flatMap { newTargetKey => | ||
extract(right, AttributeSet.empty, | ||
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right, | ||
targetKey = newTargetKey) | ||
} | ||
} else { | ||
None | ||
} | ||
} | ||
} else if (right.output.exists(_.semanticEquals(targetKey))) { | ||
extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, | ||
currentPlan = right, targetKey = targetKey).orElse { | ||
// We can also extract from the left side if the join keys are transitive. | ||
rkeys.zip(lkeys).find(_._1.semanticEquals(targetKey)).map(_._2) | ||
.flatMap { newTargetKey => | ||
extract(left, AttributeSet.empty, | ||
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left, | ||
targetKey = newTargetKey) | ||
} | ||
if (canPruneRight(joinType)) { | ||
rkeys.zip(lkeys).find(_._1.semanticEquals(targetKey)).map(_._2) | ||
.flatMap { newTargetKey => | ||
extract(left, AttributeSet.empty, | ||
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left, | ||
targetKey = newTargetKey) | ||
} | ||
} else { | ||
None | ||
} | ||
} | ||
} else { | ||
None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad we described the idea in the comments. It's clear that for certain join types, the join child output is not a superset of the join output for transitive join keys.