New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subquery cache & friends #21888
base: master
Are you sure you want to change the base?
Subquery cache & friends #21888
Conversation
sopel39
commented
May 9, 2024
•
edited
edited
9f4aa11
to
ad12339
Compare
|
||
List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of()); | ||
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes); | ||
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes); | ||
|
||
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information | ||
if (optimizedLocalScheduling) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove optimizedLocalScheduling
from NodeSchedulerConfig ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make a separate PR out of the last commit ? It's unrelated to everything else and can be landed quickly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also need to change implementation of hive.force-local-scheduling
. With current changes we're effectively forcing local scheduling by default in hive (unless node went down). That's probably not the right default as it could create hot spots in the cluster. Instead of flipping isRemotelyAccessible
, that flag should now change whether or not host addresses are returned by hive connector and by default we shouldn't provide addresses from HDFS.
c270355
to
a72b5df
Compare
a72b5df
to
4f474e2
Compare
4f474e2
to
a062f85
Compare
ChooseAlternativeNode defines alternative sub-plans that can be used to execute given part of the query. The actual sub-plan is then chosen per split during task execution. Alternative sub-plans cannot span multiple stages and are only supported for source stages. Co-authored-by: Assaf Bern <assaf.bern@starburstdata.com>
These methods are required by subquery cache to describe split data for cache key purpose. ConnectorPageSourceProvider#getUnenforcedPredicate is used to describe what unenforced predicate will be applied on split data. ConnectorPageSourceProvider#prunePredicate is used to simplify filter predicates on per split bases (e.g. removing paritioning predicates that fully contain split data) Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com> Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Dynamic row filtering performs fine-grained filtering of rows at table scan level thus greatly improving performance of some queries.
With the new contract scheduler (pipeline or FTE) will schedule remote accessible splits on selected nodes if such nodes are available and only fallback to other nodes is nodes are no longer part of cluster. Connector might have stalled node information while creating splits which could result in selecting nodes which are now offline. Additionally, in FTE mode nodes can go down so split addresses could no longer be valid then task is restarted. Additionally, this commit simplifies UniformNodeSelector optimizedLocalScheduling which was hard to reason about and was not taking advantages of recent improvements like adaptive split queue length. Co-authored-by: Karol Sobczak <napewnotrafi@gmail.com>
CacheManager is a set of SPI classes for implementing split level cache storage. MemoryCacheManager is a high-performance implementation of CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent rows produced by ConnectorPageSource for a given split. Cache ids can also be used to canonicalise query plans for the purpouse of comparison or cache key generation. This commit implements cache ids for Hive, Iceberg, Delta and TPCH connectors. Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com> Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com> Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Cache hit rate depend on deterministic split generation. Hive connector has a concept of "initial splits" which are smaller and there is a limited of them. Therefore, if deterministic splits are required, then initial splits must be disabled because Hive split generation doesn't have guaranteed ordering.
f771ae0
to
a0ab7fc
Compare
07a30e6
to
8abb4b9
Compare
Dynamic filter id might be registered by both local join and as coming from coordinator.
8abb4b9
to
865c615
Compare
CanonicalSubplanExtractor creates a canonical representation of a subplan using cache ids provided by the connector. Canonical subplans are used to compare plans against each other and enable extracting of common subplans. Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Subquery cache is a lightweight mechanism for caching source stage computations. It works across queries, but also within a query if similar subqueries are identified. Subquery cache works with both streaming and FTE mode. Cache results are never stalled since data is cached per split. Dedicated "cache splits ids" include create time and change set (in case of Delta/Iceberg). Subquery cache works as follows: 1. During planning, subqueries eligible for caching are identified. If there are similar subqueries within a query, then common subplan is extracted. 2. Query plan is rewritten using caching plan alternatives (fallback to original subquery, cache data, load from cache) 3. SPI PlanSignatures are computed for each cached subquery 4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair 5. On the worker cache plugin (currently only memory based) will determine if cached data is available for a given split Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com> Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com> Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com> Co-authored-by: Raunaq Morarka <raunaqmorarka@gmail.com>
865c615
to
74302ec
Compare