Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): call walkLogicalPlanTree for inner join/agg plans #1755

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,14 @@ trait DefaultPlanner {
}.toList
} else toReduceLevel.plans

/*
* NOTE: this LocalPartitionReduceAggregateExec wrapper is always created even when toReduceLevel2
* contains only one plan; this may artificially bloat QueryStats. However, ExecPlans currently
* offer no simple method to update their dispatchers, and forceRootDispatcher here must be
* honored to support target-schema pushdowns (see materializeWithPushdown in the SingleClusterPlanner).
* Given that the work required to allow dispatcher updates and/or rework pushdown logic is nontrivial
* and the QueryStats bloat given by unnecessary aggregation plans is likely small, the fix is skipped for now.
*/
val reduceDispatcher = forceRootDispatcher.getOrElse(PlannerUtil.pickDispatcher(toReduceLevel2))
val reducer = LocalPartitionReduceAggregateExec(qContext, reduceDispatcher, toReduceLevel2, lp.operator, lp.params)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// more evidence that these two classes should be merged.
private def materializeAggregate(aggregate: Aggregate, queryContext: QueryContext): PlanResult = {
val plan = if (LogicalPlanUtils.hasDescendantAggregateOrJoin(aggregate.vectors)) {
val childPlan = materialize(aggregate.vectors, queryContext)
addAggregator(aggregate, queryContext, PlanResult(Seq(childPlan)))
val childPlanRes = walkLogicalPlanTree(aggregate.vectors, queryContext)
addAggregator(aggregate, queryContext, childPlanRes)
} else {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val (partitions, _, _, _) = resolvePartitionsAndRoutingKeys(aggregate, queryParams)
Expand Down Expand Up @@ -766,16 +766,16 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val rhsQueryContext = qContext.copy(origQueryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams].
copy(promQl = LogicalPlanParser.convertToQuery(logicalPlan.rhs)))

val lhsExec = this.materialize(logicalPlan.lhs, lhsQueryContext)
val rhsExec = this.materialize(logicalPlan.rhs, rhsQueryContext)
val lhsPlanRes = walkLogicalPlanTree(logicalPlan.lhs, lhsQueryContext)
val rhsPlanRes = walkLogicalPlanTree(logicalPlan.rhs, rhsQueryContext)
alextheimer marked this conversation as resolved.
Show resolved Hide resolved

val execPlan = if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, InProcessPlanDispatcher(queryConfig), Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
SetOperatorExec(qContext, InProcessPlanDispatcher(queryConfig), lhsPlanRes.plans, rhsPlanRes.plans,
logicalPlan.operator, logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
BinaryJoinExec(qContext, inProcessPlanDispatcher, lhsPlanRes.plans, rhsPlanRes.plans, logicalPlan.operator,
logicalPlan.cardinality, logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn),
LogicalPlanUtils.renameLabels(logicalPlan.include, datasetMetricColumn), datasetMetricColumn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,16 @@ class ShardKeyRegexPlanner(val dataset: Dataset,

// FIXME, Optimize and push the lhs and rhs to wrapped planner if they belong to same partition

val lhsExec = materialize(logicalPlan.lhs, lhsQueryContext)
val rhsExec = materialize(logicalPlan.rhs, rhsQueryContext)
val lhsPlanRes = walkLogicalPlanTree(logicalPlan.lhs, lhsQueryContext)
val rhsPlanRes = walkLogicalPlanTree(logicalPlan.rhs, rhsQueryContext)

val execPlan = if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
SetOperatorExec(qContext, inProcessPlanDispatcher, lhsPlanRes.plans, rhsPlanRes.plans, logicalPlan.operator,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
BinaryJoinExec(qContext, inProcessPlanDispatcher, lhsPlanRes.plans, rhsPlanRes.plans, logicalPlan.operator,
logicalPlan.cardinality,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn),
Expand Down Expand Up @@ -429,12 +429,12 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
// there are any, the provided aggregation needs to be done using inProcess, else we can materialize the aggregate
// using the wrapped planner
val plan = if (LogicalPlanUtils.hasDescendantAggregateOrJoin(aggregate.vectors)) {
val childPlan = materialize(aggregate.vectors, queryContext)
val childPlanRes = walkLogicalPlanTree(aggregate.vectors, queryContext)
// We are here because we have descendent aggregate, if that was multi-partition, the dispatcher will
// be InProcessPlanDispatcher and adding the current aggregate using addAggregate will use the same dispatcher
// If the underlying plan however is not multi partition, adding the aggregator using addAggregator will
// use the same dispatcher
addAggregator(aggregate, queryContext, PlanResult(Seq(childPlan)))
addAggregator(aggregate, queryContext, childPlanRes)
} else {
val execPlans = generateExecWithoutRegex(aggregate,
LogicalPlan.getNonMetricShardKeyFilters(aggregate, dataset.options.nonMetricShardColumns).head, queryContext)
Expand Down