Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable push-down of constant join conditions in outer-joins #15760

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

mkleen
Copy link
Contributor

@mkleen mkleen commented Mar 25, 2024

Summary of the changes / Why this improves CrateDB

This is based on @jeeminso previous work on joins.

At the current state we have the optimizer rule optimizer_move_constant_join_conditions_beneath_join which pushes down constant join conditions beyond an inner-join:

Join[INNER | ((x = y) AND (y > 1))]
  ├ Collect[doc.t1 | [x] | true]
  └ Collect[doc.t2 | [y] | true]

becomes:

Join[INNER | (x = y)]
  ├ Collect[doc.t1 | [x] | true]
  └ Filter[(y > 1)]
     └ Collect[doc.t2 | [y] | true]

We also have already optimizer_move_filter_beneath_join which pushes filters beyond inner/outer/cross/nested joins:

Filter[(y > 1)] 
└ Join[INNER | (x = x)]    
    ├ Collect[doc.t1 | [x] | true]
    └ Collect[doc.t2 | [x] | true]

becomes:

Join[INNER | (x = y)]
  ├ Collect[doc.t1 | [] | true]
  └ Filter[(y > 1)]
     └ Collect[doc.t2 | [] | true]	 

This pr changes optimizer_move_constant_join_conditions_beneath_join so the constant join condition is extracted to a filter on top of the join which can then be pushed by optimizer_move_filter_beneath_join:

Join[RIGHT | ((x = y) AND (y > 1))]
  ├ Collect[doc.t1 | [x] | true]
  └ Collect[doc.t2 | [y] | true]

becomes:

Filter[(y > 1)] 
└ Join[RIGHT | (x = y)]    
    ├ Collect[doc.t1 | [x] | true]
    └ Collect[doc.t2 | [y] | true]

to:

Join[RIGHT | (x = y)]
  ├ Collect[doc.t1 | [x] | true]
  └ Filter[(y > 1)]
     └ Collect[doc.t2 | [y] | true]	 	

The benefit of this change is:

  • Constant join condition can now also be pushed to outer-joins not only inner-joins
  • Constant join conditions can be pushed through nested joins which was not possible before
  • Push down logic is not duplicated anymore
  • Outer-joins may become outer-equi-joins and can be converted to inner-joins

Checklist

  • Added an entry in the latest docs/appendices/release-notes/<x.y.0>.rst for user facing changes
  • Updated documentation & sql_features table for user facing changes
  • Touched code is covered by tests
  • CLA is signed
  • This does not contain breaking changes, or if it does:
    • It is released within a major release
    • It is recorded in the latest docs/appendices/release-notes/<x.y.0>.rst
    • It was marked as deprecated in an earlier release if possible
    • You've thought about the consequences and other components are adapted
      (E.g. AdminUI)

@mkleen mkleen force-pushed the mkleen/constant-outer-joins branch 5 times, most recently from 3977a0d to 4e86d49 Compare March 26, 2024 19:25
@mkleen mkleen force-pushed the mkleen/constant-outer-joins branch from 4e86d49 to 544df0c Compare April 8, 2024 09:24
@mkleen mkleen force-pushed the mkleen/constant-outer-joins branch from 544df0c to d54d92c Compare April 8, 2024 10:26
" │ └ Collect[doc.t2 | [cluster_id] | (kind = 'bar')]",
" └ Rename[cluster_id, kind] AS temp",
" └ Collect[doc.t2 | [cluster_id, kind] | true]"
"Eval[id, reference]",
Copy link
Contributor Author

@mkleen mkleen Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example of why this change makes sense. The constant join condition from the left-join gets extracted and pushed down and the outer-join can be converted to an inner-join which leads to a more efficient query plan using solely hash-joins.

+------------------------------------------------------+----------------------------------------------------------------------------------------+
| STEP                                                 | QUERY PLAN                                                                             |
+------------------------------------------------------+----------------------------------------------------------------------------------------+
| Initial logical plan                                 | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ Join[LEFT | ((cluster_id = id) AND (kind = 'bar'))] (rows=unknown)                 |
|                                                      |     ├ Join[INNER | ((cluster_id = id) AND (kind = 'bar'))] (rows=unknown)              |
|                                                      |     │  ├ Filter[(reference = 'bazinga')] (rows=0)                                      |
|                                                      |     │  │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                        |
|                                                      |     │  │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                  |
|                                                      |     │  │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)            |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                    |
|                                                      |     └ Rename[cluster_id, kind] AS temp (rows=unknown)                                  |
|                                                      |       └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                     |
| optimizer_extract_constant_join_conditions_to_filter | Eval[id, reference] (rows=0)                                                           |
|                                                      |   └ Filter[(kind = 'bar')] (rows=0)                                                    |
|                                                      |     └ Join[LEFT | (cluster_id = id)] (rows=unknown)                                    |
|                                                      |       ├ Join[INNER | ((cluster_id = id) AND (kind = 'bar'))] (rows=unknown)            |
|                                                      |       │  ├ Filter[(reference = 'bazinga')] (rows=0)                                    |
|                                                      |       │  │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                      |
|                                                      |       │  │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                |
|                                                      |       │  │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)          |
|                                                      |       │  └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_rewrite_filter_on_outer_join_to_inner_join | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ Join[INNER | (cluster_id = id)] (rows=unknown)                                     |
|                                                      |     ├ Join[INNER | ((cluster_id = id) AND (kind = 'bar'))] (rows=unknown)              |
|                                                      |     │  ├ Filter[(reference = 'bazinga')] (rows=0)                                      |
|                                                      |     │  │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                        |
|                                                      |     │  │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                  |
|                                                      |     │  │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)            |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                    |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_rewrite_join_plan                          | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ Join[INNER | ((cluster_id = id) AND (kind = 'bar'))] (rows=unknown)              |
|                                                      |     │  ├ Filter[(reference = 'bazinga')] (rows=0)                                      |
|                                                      |     │  │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                        |
|                                                      |     │  │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                  |
|                                                      |     │  │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)            |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                    |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_extract_constant_join_conditions_to_filter | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |     │  └ Join[INNER | (cluster_id = id)] (rows=unknown)                                |
|                                                      |     │    ├ Filter[(reference = 'bazinga')] (rows=0)                                    |
|                                                      |     │    │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                      |
|                                                      |     │    │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                |
|                                                      |     │    │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)          |
|                                                      |     │    └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_move_filter_beneath_join                   | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ Join[INNER | (cluster_id = id)] (rows=unknown)                                   |
|                                                      |     │  ├ Filter[(reference = 'bazinga')] (rows=0)                                      |
|                                                      |     │  │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                        |
|                                                      |     │  │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                  |
|                                                      |     │  │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)            |
|                                                      |     │  └ Filter[(kind = 'bar')] (rows=0)                                               |
|                                                      |     │    └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_rewrite_join_plan                          | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ Filter[(reference = 'bazinga')] (rows=0)                                      |
|                                                      |     │  │  └ Join[INNER | (subscription_id = id)] (rows=unknown)                        |
|                                                      |     │  │    ├ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                  |
|                                                      |     │  │    └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)            |
|                                                      |     │  └ Filter[(kind = 'bar')] (rows=0)                                               |
|                                                      |     │    └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_move_filter_beneath_join                   | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ Join[INNER | (subscription_id = id)] (rows=unknown)                           |
|                                                      |     │  │  ├ Filter[(reference = 'bazinga')] (rows=0)                                   |
|                                                      |     │  │  │  └ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                 |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Filter[(kind = 'bar')] (rows=0)                                               |
|                                                      |     │    └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_rewrite_join_plan                          | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ HashJoin[(subscription_id = id)] (rows=unknown)                               |
|                                                      |     │  │  ├ Filter[(reference = 'bazinga')] (rows=0)                                   |
|                                                      |     │  │  │  └ Collect[doc.t3 | [id, reference] | true] (rows=unknown)                 |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Filter[(kind = 'bar')] (rows=0)                                               |
|                                                      |     │    └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_merge_filter_and_collect                   | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ HashJoin[(subscription_id = id)] (rows=unknown)                               |
|                                                      |     │  │  ├ Collect[doc.t3 | [id, reference] | (reference = 'bazinga')] (rows=unknown) |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Filter[(kind = 'bar')] (rows=0)                                               |
|                                                      |     │    └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                  |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_merge_filter_and_collect                   | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ HashJoin[(subscription_id = id)] (rows=unknown)                               |
|                                                      |     │  │  ├ Collect[doc.t3 | [id, reference] | (reference = 'bazinga')] (rows=unknown) |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id, kind] | (kind = 'bar')] (rows=unknown)          |
|                                                      |     └ Filter[(kind = 'bar')] (rows=0)                                                  |
|                                                      |       └ Rename[cluster_id, kind] AS temp (rows=unknown)                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_move_filter_beneath_rename                 | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ HashJoin[(subscription_id = id)] (rows=unknown)                               |
|                                                      |     │  │  ├ Collect[doc.t3 | [id, reference] | (reference = 'bazinga')] (rows=unknown) |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id, kind] | (kind = 'bar')] (rows=unknown)          |
|                                                      |     └ Rename[cluster_id, kind] AS temp (rows=0)                                        |
|                                                      |       └ Filter[(kind = 'bar')] (rows=0)                                                |
|                                                      |         └ Collect[doc.t2 | [cluster_id, kind] | true] (rows=unknown)                   |
| optimizer_merge_filter_and_collect                   | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ HashJoin[(subscription_id = id)] (rows=unknown)                               |
|                                                      |     │  │  ├ Collect[doc.t3 | [id, reference] | (reference = 'bazinga')] (rows=unknown) |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id, kind] | (kind = 'bar')] (rows=unknown)          |
|                                                      |     └ Rename[cluster_id, kind] AS temp (rows=unknown)                                  |
|                                                      |       └ Collect[doc.t2 | [cluster_id, kind] | (kind = 'bar')] (rows=unknown)           |
| Final logical plan                                   | Eval[id, reference] (rows=unknown)                                                     |
|                                                      |   └ HashJoin[(cluster_id = id)] (rows=unknown)                                         |
|                                                      |     ├ HashJoin[(cluster_id = id)] (rows=unknown)                                       |
|                                                      |     │  ├ HashJoin[(subscription_id = id)] (rows=unknown)                               |
|                                                      |     │  │  ├ Collect[doc.t3 | [id, reference] | (reference = 'bazinga')] (rows=unknown) |
|                                                      |     │  │  └ Collect[doc.t1 | [subscription_id, id] | true] (rows=unknown)              |
|                                                      |     │  └ Collect[doc.t2 | [cluster_id] | (kind = 'bar')] (rows=unknown)                |
|                                                      |     └ Rename[cluster_id] AS temp (rows=unknown)                                        |
|                                                      |       └ Collect[doc.t2 | [cluster_id] | (kind = 'bar')] (rows=unknown)                 |
+------------------------------------------------------+----------------------------------------------------------------------------------------+
EXPLAIN 14 rows in set (0.026 sec)

@mkleen mkleen marked this pull request as ready for review April 8, 2024 10:40
@mkleen mkleen changed the title Push constant join conditions down in outer joins Enable push-down of constant join conditions down in outer-joins Apr 8, 2024
@mkleen mkleen changed the title Enable push-down of constant join conditions down in outer-joins Enable push-down of constant join conditions in outer-joins Apr 8, 2024
@mkleen mkleen requested a review from mfussenegger April 9, 2024 13:53
@mkleen
Copy link
Contributor Author

mkleen commented Apr 9, 2024

@mfussenegger I made a second iteration to handle outer joins properly. Outer-joins only apply constant join conditions on their non-preserved side and get ignored on the other side:

  • Left-joins will only filter on the rhs
  • Right-joins will only filter on the lhs

Therefore we can only extract the filter in these cases on top of the join. The pushdown took that already into account, but we have to be aware of that also on the filter creation.

@mkleen
Copy link
Contributor Author

mkleen commented Apr 10, 2024

The test is still flaky, i will put this on hold.

@mkleen mkleen marked this pull request as draft April 10, 2024 16:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants