Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Optimize count(1) in hdfs scanner by rewriting plan to sum #43616

Merged
merged 13 commits into from May 10, 2024

Conversation

dirtysalt
Copy link
Contributor

@dirtysalt dirtysalt commented Apr 4, 2024

Why I'm doing:

Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms

What I'm doing:

Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+

And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+

Fixes #45242

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

@dirtysalt dirtysalt requested review from a team as code owners April 4, 2024 21:15
@wanpengfei-git wanpengfei-git requested a review from a team April 4, 2024 21:15
@dirtysalt dirtysalt force-pushed the count-star-optimization branch 6 times, most recently from 96d59db to 5d2635c Compare April 11, 2024 15:49
@dirtysalt dirtysalt enabled auto-merge (squash) April 13, 2024 02:52
mofeiatwork
mofeiatwork previously approved these changes Apr 25, 2024
Copy link
Contributor

@zombee0 zombee0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

stephen-shelby
stephen-shelby previously approved these changes May 7, 2024
@stephen-shelby stephen-shelby self-requested a review May 7, 2024 02:46
Youngwb
Youngwb previously approved these changes May 7, 2024
Youngwb
Youngwb previously approved these changes May 8, 2024
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
Signed-off-by: yanz <dirtysalt1987@gmail.com>
@dirtysalt dirtysalt dismissed stale reviews from packy92 and stephen-shelby via 92dcabc May 9, 2024 23:03
Copy link

sonarcloud bot commented May 9, 2024

Quality Gate Failed Quality Gate failed

Failed conditions
5.0% Duplication on New Code (required ≤ 3%)

See analysis details on SonarCloud

Copy link

[FE Incremental Coverage Report]

pass : 131 / 151 (86.75%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/connector/iceberg/cost/IcebergStatisticProvider.java 3 4 75.00% [251]
🔵 com/starrocks/catalog/IcebergTable.java 4 5 80.00% [179]
🔵 com/starrocks/sql/optimizer/rule/transformation/RewriteSimpleAggToHDFSScanRule.java 114 132 86.36% [100, 101, 102, 110, 111, 163, 164, 168, 169, 170, 201, 217, 222, 228, 243, 245, 247, 262]
🔵 com/starrocks/sql/optimizer/Optimizer.java 3 3 100.00% []
🔵 com/starrocks/qe/SessionVariable.java 4 4 100.00% []
🔵 com/starrocks/sql/optimizer/rule/RuleSet.java 3 3 100.00% []

Copy link

[BE Incremental Coverage Report]

pass : 100 / 109 (91.74%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 be/src/formats/orc/orc_chunk_reader.cpp 0 6 00.00% [1222, 1223, 1224, 1225, 1226, 1227]
🔵 be/src/exec/hdfs_scanner_orc.cpp 21 22 95.45% [571]
🔵 be/src/exec/hdfs_scanner.cpp 39 41 95.12% [415, 453]
🔵 be/src/exec/jni_scanner.cpp 23 23 100.00% []
🔵 be/src/formats/parquet/file_reader.cpp 10 10 100.00% []
🔵 be/src/exec/hdfs_scanner_text.cpp 7 7 100.00% []

@imay imay disabled auto-merge May 10, 2024 01:26
@imay imay merged commit b6ca919 into StarRocks:main May 10, 2024
42 of 44 checks passed
@dirtysalt dirtysalt deleted the count-star-optimization branch May 11, 2024 16:00
@dirtysalt
Copy link
Contributor Author

@mergify backport branch-3.3

Copy link
Contributor

mergify bot commented May 14, 2024

backport branch-3.3

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request May 14, 2024
…um` (#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes #45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>
(cherry picked from commit b6ca919)

# Conflicts:
#	java-extensions/hive-reader/src/main/java/com/starrocks/hive/reader/HiveScanner.java
#	test/sql/test_iceberg/R/test_iceberg_catalog
#	test/sql/test_iceberg/T/test_iceberg_catalog
wanpengfei-git pushed a commit that referenced this pull request May 14, 2024
…um` (backport #43616) (#45618)

Signed-off-by: yanz <dirtysalt1987@gmail.com>
Co-authored-by: RyanZ <dirtysalt1987@gmail.com>
dirtysalt added a commit to dirtysalt/starrocks that referenced this pull request May 14, 2024
…um` (StarRocks#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes StarRocks#45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>

Signed-off-by: RyanZ <dirtysalt1987@gmail.com>
node pushed a commit to vivo/starrocks that referenced this pull request May 17, 2024
…um` (StarRocks#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes StarRocks#45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

optimize count(1) performance on hive/iceberg table