Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize count(1) performance on hive/iceberg table #45242

Closed
3 tasks done
dirtysalt opened this issue May 7, 2024 · 0 comments · Fixed by #43616
Closed
3 tasks done

optimize count(1) performance on hive/iceberg table #45242

dirtysalt opened this issue May 7, 2024 · 0 comments · Fixed by #43616

Comments

@dirtysalt dirtysalt added the type/enhancement Make an enhancement to StarRocks label May 7, 2024
imay pushed a commit that referenced this issue May 10, 2024
…um` (#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes #45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>
mergify bot pushed a commit that referenced this issue May 14, 2024
…um` (#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes #45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>
(cherry picked from commit b6ca919)

# Conflicts:
#	java-extensions/hive-reader/src/main/java/com/starrocks/hive/reader/HiveScanner.java
#	test/sql/test_iceberg/R/test_iceberg_catalog
#	test/sql/test_iceberg/T/test_iceberg_catalog
dirtysalt added a commit to dirtysalt/starrocks that referenced this issue May 14, 2024
…um` (StarRocks#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes StarRocks#45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>

Signed-off-by: RyanZ <dirtysalt1987@gmail.com>
node pushed a commit to vivo/starrocks that referenced this issue May 17, 2024
…um` (StarRocks#43616)

Why I'm doing:
Rigjht now hdfs scanner optimization on count(1) is to output const column of expected count.

And we can see in extreme case(large dataset), the chunk number flows in pipeline will be extremely huge, and operator time and overhead time is not neglectable.

And here is a profile of select count(*) from hive.hive_ssb100g_parquet.lineorder. To reproduce this extreme case, I've changed code to scale morsels by 20x and repeat row groups by 10x.

in concurrency=1 case , total time is 51s

         - OverheadTime: 25s37ms
           - __MAX_OF_OverheadTime: 25s111ms
           - __MIN_OF_OverheadTime: 24s962ms

             - PullTotalTime: 12s376ms
               - __MAX_OF_PullTotalTime: 13s147ms
               - __MIN_OF_PullTotalTime: 11s885ms
What I'm doing:
Rewrite the count(1) query to sum like. So each row group reader will only emit at one chunk(size = 1).

And total time is 9s.

Original plan is like

+----------------------------------+
| Explain String                   |
+----------------------------------+
| PLAN FRAGMENT 0                  |
|  OUTPUT EXPRS:18: count          |
|   PARTITION: UNPARTITIONED       |
|                                  |
|   RESULT SINK                    |
|                                  |
|   4:AGGREGATE (merge finalize)   |
|   |  output: count(18: count)    |
|   |  group by:                   |
|   |                              |
|   3:EXCHANGE                     |
|                                  |
| PLAN FRAGMENT 1                  |
|  OUTPUT EXPRS:                   |
|   PARTITION: RANDOM              |
|                                  |
|   STREAM DATA SINK               |
|     EXCHANGE ID: 03              |
|     UNPARTITIONED                |
|                                  |
|   2:AGGREGATE (update serialize) |
|   |  output: count(*)            |
|   |  group by:                   |
|   |                              |
|   1:Project                      |
|   |  <slot 20> : 1               |
|   |                              |
|   0:HdfsScanNode                 |
|      TABLE: lineorder            |
|      partitions=1/1              |
|      cardinality=600037902       |
|      avgRowSize=5.0              |
+----------------------------------+
And rewritted plan is like

+-----------------------------------+
| Explain String                    |
+-----------------------------------+
| PLAN FRAGMENT 0                   |
|  OUTPUT EXPRS:18: count           |
|   PARTITION: UNPARTITIONED        |
|                                   |
|   RESULT SINK                     |
|                                   |
|   3:AGGREGATE (merge finalize)    |
|   |  output: sum(18: count)       |
|   |  group by:                    |
|   |                               |
|   2:EXCHANGE                      |
|                                   |
| PLAN FRAGMENT 1                   |
|  OUTPUT EXPRS:                    |
|   PARTITION: RANDOM               |
|                                   |
|   STREAM DATA SINK                |
|     EXCHANGE ID: 02               |
|     UNPARTITIONED                 |
|                                   |
|   1:AGGREGATE (update serialize)  |
|   |  output: sum(19: ___count___) |
|   |  group by:                    |
|   |                               |
|   0:HdfsScanNode                  |
|      TABLE: lineorder             |
|      partitions=1/1               |
|      cardinality=1                |
|      avgRowSize=1.0               |
+-----------------------------------+
Fixes StarRocks#45242

Signed-off-by: yanz <dirtysalt1987@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants