Skip to content

Commit

Permalink
[SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should ch…
Browse files Browse the repository at this point in the history
…eck child join type

### What changes were proposed in this pull request?
In our prod we meet a case
```
with
refund_info as (
      select
            b_key,
            1 as b_type
      from  default.table_b
),
next_month_time as (
      select /*+ broadcast(b, c) */
             c_key
           ,1 as c_time
      FROM default.table_c
)
select
        a.loan_id
        ,c.c_time
        ,b.type
from  (
    select
           a_key
    from default.table_a2
    union
   select
         a_key
   from  default.table_a1
) a
left join refund_info b  on a.loan_id = b.loan_id
left join next_month_time c on a.loan_id = c.loan_id
;
```
In this query, it inject table_b as table_c's runtime bloom filter, but table_b join condition is LEFT OUTER, causing table_c missing data.

Caused by
![image](https://github.com/apache/spark/assets/46485123/be45e211-23e4-4105-98b4-aa571c87665f)

InjectRuntimeFilter.extractSelectiveFilterOverScan(), when handle join, since left plan (a left outer join b's a) is a UNION then the extract result is NONE, then zip left/right keys to extract from join's right,  finnaly cause this issue.

### Why are the changes needed?
Fix data correctness issue

### Does this PR introduce _any_ user-facing change?
Yea, fix data incorrect issue

### How was this patch tested?
For the existed PR, it fix the wrong case

Before: It extract a LEFT_ANTI_JOIN's right child to the outside bf3....its not correct
```
Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
:     +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
:        +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926) AND might_contain(scalar-subquery#48719 [], xxhash64(c3#45926, 42)))
   :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45920, 42), 1000000, 8388608, 0, 0) AS bloomFilter#48718]
   :     +- Project [c2#45920]
   :        +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
   :           +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
   +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet
```

After:
```
Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
:     +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
:        +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926))
   +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet
```

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #46263 from AngersZhuuuu/SPARK-48027.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
AngersZhuuuu and cloud-fan committed May 7, 2024
1 parent 56fe185 commit b5e39be
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
currentPlan,
targetKey)
case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) =>
// Runtime filters use one side of the [[Join]] to build a set of join key values and prune
// the other side of the [[Join]]. It's also OK to use a superset of the join key values
// (ignore null values) to do the pruning.
Expand All @@ -129,24 +129,40 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
if (left.output.exists(_.semanticEquals(targetKey))) {
extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false,
currentPlan = left, targetKey = targetKey).orElse {
// We can also extract from the right side if the join keys are transitive.
lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
.flatMap { newTargetKey =>
extract(right, AttributeSet.empty,
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,
targetKey = newTargetKey)
}
// We can also extract from the right side if the join keys are transitive, and
// the right side always produces a superset output of join left keys.
// Let's look at an example
// left table: 1, 2, 3
// right table, 3, 4
// left outer join output: (1, null), (2, null), (3, 3)
// left key output: 1, 2, 3
// Any join side always produce a superset output of its corresponding
// join keys, but for transitive join keys we need to check the join type.
if (canPruneLeft(joinType)) {
lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
.flatMap { newTargetKey =>
extract(right, AttributeSet.empty,
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,
targetKey = newTargetKey)
}
} else {
None
}
}
} else if (right.output.exists(_.semanticEquals(targetKey))) {
extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false,
currentPlan = right, targetKey = targetKey).orElse {
// We can also extract from the left side if the join keys are transitive.
rkeys.zip(lkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
.flatMap { newTargetKey =>
extract(left, AttributeSet.empty,
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
targetKey = newTargetKey)
}
if (canPruneRight(joinType)) {
rkeys.zip(lkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
.flatMap { newTargetKey =>
extract(left, AttributeSet.empty,
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
targetKey = newTargetKey)
}
} else {
None
}
}
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
"(bf1.c1 = bf2.c2 and bf2.a2 = 5)) as a join bf3 on bf3.c3 = a.c1", 2)
// left anti join unsupported.
// bf2 as creation side and inject runtime filter for bf3(by passing key).
assertRewroteWithBloomFilter("select * from (select * from bf1 left anti join bf2 on " +
"(bf1.c1 = bf2.c2 and bf2.a2 = 5)) as a join bf3 on bf3.c3 = a.c1")
assertDidNotRewriteWithBloomFilter("select * from (select * from bf1 left anti join bf2 " +
"on (bf1.c1 = bf2.c2 and bf2.a2 = 5)) as a join bf3 on bf3.c3 = a.c1")
// left anti join unsupported and hasn't selective filter.
assertRewroteWithBloomFilter("select * from (select * from bf1 left anti join bf2 on " +
"(bf1.c1 = bf2.c2 and bf1.a1 = 5)) as a join bf3 on bf3.c3 = a.c1", 0)
Expand Down

0 comments on commit b5e39be

Please sign in to comment.