Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of distinct aggregations #21907

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

Dith3r
Copy link
Member

@Dith3r Dith3r commented May 10, 2024

Description

Improve performance of distinct aggregations by defining additional strategies based on source properties (for example, NDV).

The strategy to use for multiple distinct aggregations.
SINGLE_STEP Computes distinct aggregations in single-step without any pre-aggregations.
This strategy will perform poorly if the number of distinct grouping keys is small.
MARK_DISTINCT uses MarkDistinct for multiple distinct aggregations
or for mix of distinct and non-distinct aggregations.
PRE_AGGREGATE Computes distinct aggregations using a combination of aggregation
and pre-aggregation steps.
AUTOMATIC chooses the strategy automatically.

Single-step strategy is preferred. However, for cases with limited concurrency due to
a small number of distinct grouping keys, it will choose an alternative strategy
based on input data statistics.

Strategy Duration Query
MARK_DISTINCT 5353ms select count(ss_customer_sk), count(distinct ss_ticket_number) from hive.tpcds_sf1000_orc.store_sales;
PRE_AGGREGATE 2468ms select count(ss_customer_sk), count(distinct ss_ticket_number) from hive.tpcds_sf1000_orc.store_sales;
MARK_DISTINCT 10109ms select count(ss_quantity), count(distinct ss_item_sk), count(distinct ss_store_sk) from hive.tpcds_sf1000_orc.store_sales;
PRE_AGGREGATE 8253ms select count(ss_quantity), count(distinct ss_item_sk), count(distinct ss_store_sk) from hive.tpcds_sf1000_orc.store_sales;

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
(X) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

Extract the logic to determine whether the direct distinct aggregation applicability,
which can be reused in multiple optimiser rules.
@cla-bot cla-bot bot added the cla-signed label May 10, 2024
@github-actions github-actions bot added docs hudi Hudi connector iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels May 10, 2024
@Dith3r Dith3r force-pushed the ke/dist-aggr branch 4 times, most recently from 0d37869 to abce86f Compare May 10, 2024 10:35
The rule replaces `OptimizeMixedDistinctAggregations`,
and adds support for multiple distinct aggregations.
Also rename corresponding config property optimizer.mark-distinct-strategy to
optimizer.distinct-aggregations-strategy and values to NONE -> SINGLE_STEP and ALWAYS -> MARK_DISTINCT
Replace optimizer.optimize-mixed-distinct-aggregations with a new
optimizer.distinct-aggregations-strategy `pre_aggregate`
Use estimated aggregation source NDV and the number
of grouping keys to decide if pre-aggregate strategy
should be used for a given aggregation
Copy link
Member

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

{
return session.getSystemProperty(MARK_DISTINCT_STRATEGY, MarkDistinctStrategy.class);
return session.getSystemProperty(DISTINCT_AGGREGATIONS_STRATEGY, DistinctAggregationsStrategy.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should merge this commit with the commit that re-adds the MARK_DISTINCT_STRATEGY propertry

@@ -683,6 +683,9 @@ public PlanOptimizers(
new RemoveRedundantIdentityProjections(),
new PushAggregationThroughOuterJoin(),
new ReplaceRedundantJoinWithSource(), // Run this after PredicatePushDown optimizer as it inlines filter constants
new DistinctAggregationToGroupBy(plannerContext), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
// It also is run before MultipleDistinctAggregationToMarkDistinct to take precedence f enabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo should be precedence if enabled

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector docs hive Hive connector hudi Hudi connector iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

None yet

2 participants