Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subquery cache & friends #21888

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

sopel39
Copy link
Member

@sopel39 sopel39 commented May 9, 2024

Implement subquery cache for Hive/Iceberg/Delta

Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

@cla-bot cla-bot bot added the cla-signed label May 9, 2024
@github-actions github-actions bot added iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels May 9, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 2 times, most recently from 9f4aa11 to ad12339 Compare May 9, 2024 16:10

List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of());
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes);

// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove optimizedLocalScheduling from NodeSchedulerConfig ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a separate PR out of the last commit ? It's unrelated to everything else and can be landed quickly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also need to change implementation of hive.force-local-scheduling. With current changes we're effectively forcing local scheduling by default in hive (unless node went down). That's probably not the right default as it could create hot spots in the cluster. Instead of flipping isRemotelyAccessible, that flag should now change whether or not host addresses are returned by hive connector and by default we shouldn't provide addresses from HDFS.

@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from c270355 to a72b5df Compare May 15, 2024 12:16
@github-actions github-actions bot added the ui Web UI label May 21, 2024
lukasz-stec and others added 11 commits May 21, 2024 23:41
ChooseAlternativeNode defines alternative sub-plans that can be used
to execute given part of the query.
The actual sub-plan is then chosen per split during task execution.
Alternative sub-plans cannot span multiple stages and are only supported
for source stages.

Co-authored-by: Assaf Bern <assaf.bern@starburstdata.com>
These methods are required by subquery cache to describe
split data for cache key purpose.

ConnectorPageSourceProvider#getUnenforcedPredicate
is used to describe what unenforced predicate will be
applied on split data.

ConnectorPageSourceProvider#prunePredicate is used
to simplify filter predicates on per split bases
(e.g. removing paritioning predicates that fully
contain split data)

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Dynamic row filtering performs fine-grained filtering of rows
at table scan level thus greatly improving performance of some queries.
With the new contract scheduler (pipeline or FTE) will schedule
remote accessible splits on selected nodes if such nodes are available
and only fallback to other nodes is nodes are no longer part of cluster.
Connector might have stalled node information while creating splits
which could result in selecting nodes which are now offline. Additionally,
in FTE mode nodes can go down so split addresses could no longer be valid
then task is restarted.

Additionally, this commit simplifies UniformNodeSelector optimizedLocalScheduling
which was hard to reason about and was not taking advantages of
recent improvements like adaptive split queue length.

Co-authored-by: Karol Sobczak <napewnotrafi@gmail.com>
CacheManager is a set of SPI classes for implementing
split level cache storage.

MemoryCacheManager is a high-performance implementation of
CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent
rows produced by ConnectorPageSource for a given split.

Cache ids can also be used to canonicalise query plans
for the purpouse of comparison or cache key generation.

This commit implements cache ids for Hive, Iceberg, Delta and TPCH
connectors.

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Cache hit rate depend on deterministic split generation.
Hive connector has a concept of "initial splits" which
are smaller and there is a limited of them.
Therefore, if deterministic splits are
required, then initial splits must be disabled because
Hive split generation doesn't have guaranteed ordering.
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 3 times, most recently from f771ae0 to a0ab7fc Compare May 22, 2024 08:25
@sopel39 sopel39 marked this pull request as ready for review May 22, 2024 08:25
@sopel39 sopel39 changed the title WIP: Subquery cache & friends Subquery cache & friends May 22, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 3 times, most recently from 07a30e6 to 8abb4b9 Compare May 22, 2024 13:18
Dynamic filter id might be registered by both local join
and as coming from coordinator.
sopel39 and others added 2 commits May 23, 2024 13:48
CanonicalSubplanExtractor creates a canonical
representation of a subplan using cache ids
provided by the connector. Canonical subplans
are used to compare plans against each other
and enable extracting of common subplans.

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Co-authored-by: Raunaq Morarka <raunaqmorarka@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector ui Web UI
Development

Successfully merging this pull request may close these issues.

None yet

4 participants