Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check child join type #46263

Closed
wants to merge 9 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Apr 28, 2024

What changes were proposed in this pull request?

In our prod we meet a case

with 
refund_info as (
      select
            b_key,
            1 as b_type
      from  default.table_b  
),
next_month_time as (
      select /*+ broadcast(b, c) */
             c_key
           ,1 as c_time
      FROM default.table_c
)
select
        a.loan_id
        ,c.c_time
        ,b.type   
from  (
    select
           a_key    
    from default.table_a2
    union
   select
         a_key
   from  default.table_a1
) a
left join refund_info b  on a.loan_id = b.loan_id
left join next_month_time c on a.loan_id = c.loan_id
;

In this query, it inject table_b as table_c's runtime bloom filter, but table_b join condition is LEFT OUTER, causing table_c missing data.

Caused by
image

InjectRuntimeFilter.extractSelectiveFilterOverScan(), when handle join, since left plan (a left outer join b's a) is a UNION then the extract result is NONE, then zip left/right keys to extract from join's right, finnaly cause this issue.

Why are the changes needed?

Fix data correctness issue

Does this PR introduce any user-facing change?

Yea, fix data incorrect issue

How was this patch tested?

For the existed PR, it fix the wrong case

Before: It extract a LEFT_ANTI_JOIN's right child to the outside bf3....its not correct

Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
:     +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
:        +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926) AND might_contain(scalar-subquery#48719 [], xxhash64(c3#45926, 42)))
   :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45920, 42), 1000000, 8388608, 0, 0) AS bloomFilter#48718]
   :     +- Project [c2#45920]
   :        +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
   :           +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
   +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet

After:

Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
:     +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
:        +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926))
   +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet

Was this patch authored or co-authored using generative AI tooling?

NO

@github-actions github-actions bot added the SQL label Apr 28, 2024
@yaooqinn yaooqinn requested a review from cloud-fan April 28, 2024 11:50
@@ -120,34 +132,49 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
currentPlan,
targetKey)
case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) =>
// Runtime filters use one side of the [[Join]] to build a set of join key values and prune
// the other side of the [[Join]]. It's also OK to use a superset of the join key values
// (ignore null values) to do the pruning.
Copy link
Contributor

@cloud-fan cloud-fan Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad we described the idea in the comments. It's clear that for certain join types, the join child output is not a superset of the join output for transitive join keys.

// Runtime filters use one side of the [[Join]] to build a set of join key values and prune
// the other side of the [[Join]]. It's also OK to use a superset of the join key values
// (ignore null values) to do the pruning.
// We assume other rules have already pushed predicates through join if possible.
// So the predicate references won't pass on anymore.
if (left.output.exists(_.semanticEquals(targetKey))) {
extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false,
Copy link
Contributor

@cloud-fan cloud-fan Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's clarify the expected strategy a bit more: For the exact join key match, like the left table here, it's always OK to generate the runtime filter using this left table, no matter what the join type is. This is because left table always produce a superset of output of the join output regarding the left keys.

For transitive join key match, it's different. The right table here does not always generate a superset output regarding left keys. Let's look at an example

left table: 1, 2, 3
right table, 3, 4
left outer join output: (1, null), (2, null), (3, 3)
left keys: 1, 2, 3

So we can't use right table to generate runtime filter.

hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,
targetKey = newTargetKey)
}
if (canExtractLeft(joinType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I explained at https://github.com/apache/spark/pull/46263/files#r1582495367 , we don't need this check here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract equi join condition, make sense. the ELSE branch also need to remove canExtractLeft, right?

}
} else if (right.output.exists(_.semanticEquals(targetKey))) {
extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false,
currentPlan = right, targetKey = targetKey).orElse {
if (canExtractRight(joinType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is wrong here. We only need this extra check for transitive join keys. In this branch, it's the left table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can just canPruneLeft and canPruneRight, it's have the same rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like current ?

@cloud-fan
Copy link
Contributor

cloud-fan commented May 7, 2024

the last commit only changes code comment, merging to master, thanks!

@cloud-fan cloud-fan closed this in b5e39be May 7, 2024
@cloud-fan
Copy link
Contributor

@AngersZhuuuu can you create a backport PR for 3.5? it has conflicts.

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu can you create a backport PR for 3.5? it has conflicts.

https://issues.apache.org/jira/browse/SPARK-44649 bring this issue, only on branch-4.0, no need to backport to 3.5

JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…eck child join type

### What changes were proposed in this pull request?
In our prod we meet a case
```
with
refund_info as (
      select
            b_key,
            1 as b_type
      from  default.table_b
),
next_month_time as (
      select /*+ broadcast(b, c) */
             c_key
           ,1 as c_time
      FROM default.table_c
)
select
        a.loan_id
        ,c.c_time
        ,b.type
from  (
    select
           a_key
    from default.table_a2
    union
   select
         a_key
   from  default.table_a1
) a
left join refund_info b  on a.loan_id = b.loan_id
left join next_month_time c on a.loan_id = c.loan_id
;
```
In this query, it inject table_b as table_c's runtime bloom filter, but table_b join condition is LEFT OUTER, causing table_c missing data.

Caused by
![image](https://github.com/apache/spark/assets/46485123/be45e211-23e4-4105-98b4-aa571c87665f)

InjectRuntimeFilter.extractSelectiveFilterOverScan(), when handle join, since left plan (a left outer join b's a) is a UNION then the extract result is NONE, then zip left/right keys to extract from join's right,  finnaly cause this issue.

### Why are the changes needed?
Fix data correctness issue

### Does this PR introduce _any_ user-facing change?
Yea, fix data incorrect issue

### How was this patch tested?
For the existed PR, it fix the wrong case

Before: It extract a LEFT_ANTI_JOIN's right child to the outside bf3....its not correct
```
Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
:     +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
:        +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926) AND might_contain(scalar-subquery#48719 [], xxhash64(c3#45926, 42)))
   :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45920, 42), 1000000, 8388608, 0, 0) AS bloomFilter#48718]
   :     +- Project [c2#45920]
   :        +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
   :           +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
   +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet
```

After:
```
Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
:     +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920))
:        +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926))
   +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet
```

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes apache#46263 from AngersZhuuuu/SPARK-48027.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 14, 2024
### What changes were proposed in this pull request?
This PR propose to add comments for the other code branch.

### Why are the changes needed?
#46263 missing the comments for the other code branch.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #46536 from beliefer/SPARK-48027_followup.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants