New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
optimize count(1) performance on hive/iceberg table #45242
Labels
Comments
24 tasks
imay
pushed a commit
that referenced
this issue
May 10, 2024
…um` (#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes #45242 Signed-off-by: yanz <dirtysalt1987@gmail.com>
mergify bot
pushed a commit
that referenced
this issue
May 14, 2024
…um` (#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes #45242 Signed-off-by: yanz <dirtysalt1987@gmail.com> (cherry picked from commit b6ca919) # Conflicts: # java-extensions/hive-reader/src/main/java/com/starrocks/hive/reader/HiveScanner.java # test/sql/test_iceberg/R/test_iceberg_catalog # test/sql/test_iceberg/T/test_iceberg_catalog
Merged
42 tasks
dirtysalt
added a commit
to dirtysalt/starrocks
that referenced
this issue
May 14, 2024
…um` (StarRocks#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes StarRocks#45242 Signed-off-by: yanz <dirtysalt1987@gmail.com> Signed-off-by: RyanZ <dirtysalt1987@gmail.com>
Merged
24 tasks
node
pushed a commit
to vivo/starrocks
that referenced
this issue
May 17, 2024
…um` (StarRocks#43616) Why I'm doing: Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count. And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable. And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x. in concurrency=1 case , total time is 51s - OverheadTime: 25s37ms - __MAX_OF_OverheadTime: 25s111ms - __MIN_OF_OverheadTime: 24s962ms - PullTotalTime: 12s376ms - __MAX_OF_PullTotalTime: 13s147ms - __MIN_OF_PullTotalTime: 11s885ms What I'm doing: Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1). And total time is 9s. Original plan is like +----------------------------------+ | Explain String | +----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 4:AGGREGATE (merge finalize) | | | output: count(18: count) | | | group by: | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:AGGREGATE (update serialize) | | | output: count(*) | | | group by: | | | | | 1:Project | | | <slot 20> : 1 | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=600037902 | | avgRowSize=5.0 | +----------------------------------+ And rewritted plan is like +-----------------------------------+ | Explain String | +-----------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:18: count | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 3:AGGREGATE (merge finalize) | | | output: sum(18: count) | | | group by: | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:AGGREGATE (update serialize) | | | output: sum(19: ___count___) | | | group by: | | | | | 0:HdfsScanNode | | TABLE: lineorder | | partitions=1/1 | | cardinality=1 | | avgRowSize=1.0 | +-----------------------------------+ Fixes StarRocks#45242 Signed-off-by: yanz <dirtysalt1987@gmail.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
count(1)
in hdfs scanner by rewriting plan tosum
(backport #43616) #45622count(1)
in hdfs scanner by rewriting plan tosum
(backport #43616) #45618count(1)
in hdfs scanner by rewriting plan tosum
#43616Enhancement
The text was updated successfully, but these errors were encountered: