Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should ch…
…eck child join type ### What changes were proposed in this pull request? In our prod we meet a case ``` with refund_info as ( select b_key, 1 as b_type from default.table_b ), next_month_time as ( select /*+ broadcast(b, c) */ c_key ,1 as c_time FROM default.table_c ) select a.loan_id ,c.c_time ,b.type from ( select a_key from default.table_a2 union select a_key from default.table_a1 ) a left join refund_info b on a.loan_id = b.loan_id left join next_month_time c on a.loan_id = c.loan_id ; ``` In this query, it inject table_b as table_c's runtime bloom filter, but table_b join condition is LEFT OUTER, causing table_c missing data. Caused by ![image](https://github.com/apache/spark/assets/46485123/be45e211-23e4-4105-98b4-aa571c87665f) InjectRuntimeFilter.extractSelectiveFilterOverScan(), when handle join, since left plan (a left outer join b's a) is a UNION then the extract result is NONE, then zip left/right keys to extract from join's right, finnaly cause this issue. ### Why are the changes needed? Fix data correctness issue ### Does this PR introduce _any_ user-facing change? Yea, fix data incorrect issue ### How was this patch tested? For the existed PR, it fix the wrong case Before: It extract a LEFT_ANTI_JOIN's right child to the outside bf3....its not correct ``` Join Inner, (c3#45926 = c1#45914) :- Join LeftAnti, (c1#45914 = c2#45920) : :- Filter isnotnull(c1#45914) : : +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet : +- Project [c2#45920] : +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920)) : +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet +- Filter (isnotnull(c3#45926) AND might_contain(scalar-subquery#48719 [], xxhash64(c3#45926, 42))) : +- Aggregate [bloom_filter_agg(xxhash64(c2#45920, 42), 1000000, 8388608, 0, 0) AS bloomFilter#48718] : +- Project [c2#45920] : +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920)) : +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet ``` After: ``` Join Inner, (c3#45926 = c1#45914) :- Join LeftAnti, (c1#45914 = c2#45920) : :- Filter isnotnull(c1#45914) : : +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet : +- Project [c2#45920] : +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920)) : +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet +- Filter (isnotnull(c3#45926)) +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet ``` ### Was this patch authored or co-authored using generative AI tooling? NO Closes apache#46263 from AngersZhuuuu/SPARK-48027. Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
- Loading branch information