New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Optimize count(1)
in hdfs scanner by rewriting plan to sum
#43616
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
...ain/java/com/starrocks/sql/optimizer/rule/transformation/RewriteSimpleAggToHDFSScanRule.java
Show resolved
Hide resolved
dirtysalt
force-pushed
the
count-star-optimization
branch
6 times, most recently
from
April 11, 2024 15:49
96d59db
to
5d2635c
Compare
mofeiatwork
previously approved these changes
Apr 25, 2024
zombee0
approved these changes
May 6, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...ain/java/com/starrocks/sql/optimizer/rule/transformation/RewriteSimpleAggToHDFSScanRule.java
Show resolved
Hide resolved
Youngwb
reviewed
May 7, 2024
...ain/java/com/starrocks/sql/optimizer/rule/transformation/RewriteSimpleAggToHDFSScanRule.java
Show resolved
Hide resolved
stephen-shelby
previously approved these changes
May 7, 2024
fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Outdated
Show resolved
Hide resolved
Youngwb
previously approved these changes
May 7, 2024
dirtysalt
dismissed stale reviews from Youngwb, stephen-shelby, and mofeiatwork
via
May 7, 2024 16:51
626cfea
Youngwb
previously approved these changes
May 8, 2024
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
dirtysalt
force-pushed
the
count-star-optimization
branch
from
May 9, 2024 17:58
26ee770
to
e88bce5
Compare
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Quality Gate failedFailed conditions |
[FE Incremental Coverage Report]✅ pass : 131 / 151 (86.75%) file detail
|
[BE Incremental Coverage Report]✅ pass : 100 / 109 (91.74%) file detail
|
3 tasks
@mergify backport branch-3.3 |
✅ Backports have been created
|
mergify bot
pushed a commit
that referenced
this pull request
May 14, 2024
…um` (#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes #45242 Signed-off-by: yanz <dirtysalt1987@gmail.com> (cherry picked from commit b6ca919) # Conflicts: # java-extensions/hive-reader/src/main/java/com/starrocks/hive/reader/HiveScanner.java # test/sql/test_iceberg/R/test_iceberg_catalog # test/sql/test_iceberg/T/test_iceberg_catalog
Merged
42 tasks
dirtysalt
added a commit
to dirtysalt/starrocks
that referenced
this pull request
May 14, 2024
…um` (StarRocks#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes StarRocks#45242 Signed-off-by: yanz <dirtysalt1987@gmail.com> Signed-off-by: RyanZ <dirtysalt1987@gmail.com>
Merged
24 tasks
node
pushed a commit
to vivo/starrocks
that referenced
this pull request
May 17, 2024
…um` (StarRocks#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes StarRocks#45242 Signed-off-by: yanz <dirtysalt1987@gmail.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.
And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.
And here is a profile of
select count(*) from hive.hive_ssb100g_parquet.lineorder
. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.in concurrency=1 case , total time is 51s
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).
And total time is 9s.
Original plan is like
And rewritted plan is like
Fixes #45242
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: