Skip to content

Commit

Permalink
fix(query): call walkLogicalPlanTree for inner join/agg plans (#1755)
Browse files Browse the repository at this point in the history
- Additionally fixes transformer application to RemoteExec plans.
  • Loading branch information
alextheimer committed Apr 18, 2024
1 parent eb95183 commit 7fa0f0c
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 57 deletions.
Expand Up @@ -342,6 +342,14 @@ trait DefaultPlanner {
}.toList
} else toReduceLevel.plans

/*
* NOTE: this LocalPartitionReduceAggregateExec wrapper is always created even when toReduceLevel2
* contains only one plan; this may artificially bloat QueryStats. However, ExecPlans currently
* offer no simple method to update their dispatchers, and forceRootDispatcher here must be
* honored to support target-schema pushdowns (see materializeWithPushdown in the SingleClusterPlanner).
* Given that the work required to allow dispatcher updates and/or rework pushdown logic is nontrivial
* and the QueryStats bloat given by unnecessary aggregation plans is likely small, the fix is skipped for now.
*/
val reduceDispatcher = forceRootDispatcher.getOrElse(PlannerUtil.pickDispatcher(toReduceLevel2))
val reducer = LocalPartitionReduceAggregateExec(qContext, reduceDispatcher, toReduceLevel2, lp.operator, lp.params)

Expand Down
Expand Up @@ -487,8 +487,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// more evidence that these two classes should be merged.
private def materializeAggregate(aggregate: Aggregate, queryContext: QueryContext): PlanResult = {
val plan = if (LogicalPlanUtils.hasDescendantAggregateOrJoin(aggregate.vectors)) {
val childPlan = materialize(aggregate.vectors, queryContext)
addAggregator(aggregate, queryContext, PlanResult(Seq(childPlan)))
val childPlanRes = walkLogicalPlanTree(aggregate.vectors, queryContext)
addAggregator(aggregate, queryContext, childPlanRes)
} else {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val (partitions, _, _, _) = resolvePartitionsAndRoutingKeys(aggregate, queryParams)
Expand Down Expand Up @@ -766,16 +766,16 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val rhsQueryContext = qContext.copy(origQueryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams].
copy(promQl = LogicalPlanParser.convertToQuery(logicalPlan.rhs)))

val lhsExec = this.materialize(logicalPlan.lhs, lhsQueryContext)
val rhsExec = this.materialize(logicalPlan.rhs, rhsQueryContext)
val lhsPlanRes = walkLogicalPlanTree(logicalPlan.lhs, lhsQueryContext)
val rhsPlanRes = walkLogicalPlanTree(logicalPlan.rhs, rhsQueryContext)

val execPlan = if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, InProcessPlanDispatcher(queryConfig), Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
SetOperatorExec(qContext, InProcessPlanDispatcher(queryConfig), lhsPlanRes.plans, rhsPlanRes.plans,
logicalPlan.operator, logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
BinaryJoinExec(qContext, inProcessPlanDispatcher, lhsPlanRes.plans, rhsPlanRes.plans, logicalPlan.operator,
logicalPlan.cardinality, logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn),
LogicalPlanUtils.renameLabels(logicalPlan.include, datasetMetricColumn), datasetMetricColumn,
Expand Down
Expand Up @@ -390,16 +390,16 @@ class ShardKeyRegexPlanner(val dataset: Dataset,

// FIXME, Optimize and push the lhs and rhs to wrapped planner if they belong to same partition

val lhsExec = materialize(logicalPlan.lhs, lhsQueryContext)
val rhsExec = materialize(logicalPlan.rhs, rhsQueryContext)
val lhsPlanRes = walkLogicalPlanTree(logicalPlan.lhs, lhsQueryContext)
val rhsPlanRes = walkLogicalPlanTree(logicalPlan.rhs, rhsQueryContext)

val execPlan = if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
SetOperatorExec(qContext, inProcessPlanDispatcher, lhsPlanRes.plans, rhsPlanRes.plans, logicalPlan.operator,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
BinaryJoinExec(qContext, inProcessPlanDispatcher, lhsPlanRes.plans, rhsPlanRes.plans, logicalPlan.operator,
logicalPlan.cardinality,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn),
Expand Down Expand Up @@ -429,12 +429,12 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
// there are any, the provided aggregation needs to be done using inProcess, else we can materialize the aggregate
// using the wrapped planner
val plan = if (LogicalPlanUtils.hasDescendantAggregateOrJoin(aggregate.vectors)) {
val childPlan = materialize(aggregate.vectors, queryContext)
val childPlanRes = walkLogicalPlanTree(aggregate.vectors, queryContext)
// We are here because we have descendent aggregate, if that was multi-partition, the dispatcher will
// be InProcessPlanDispatcher and adding the current aggregate using addAggregate will use the same dispatcher
// If the underlying plan however is not multi partition, adding the aggregator using addAggregator will
// use the same dispatcher
addAggregator(aggregate, queryContext, PlanResult(Seq(childPlan)))
addAggregator(aggregate, queryContext, childPlanRes)
} else {
val execPlans = generateExecWithoutRegex(aggregate,
LogicalPlan.getNonMetricShardKeyFilters(aggregate, dataset.options.nonMetricShardColumns).head, queryContext)
Expand Down

0 comments on commit 7fa0f0c

Please sign in to comment.