Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): pushdown joins only if lhs/rhs shard keys are identical #1513

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,24 @@ class SingleClusterPlanner(val dataset: Dataset,
}
// scalastyle:on cyclomatic.complexity

/**
* Return the target-schema labels (if any) that will be used to materialize a RawSeries.
*/
private def getTargetSchemaLabelsFromRawSeries(rs: RawSeries, qContext: QueryContext): Option[Seq[String]] = {
val tsp = targetSchemaProvider(qContext)
val filters = getColumnFilterGroup(rs).map(_.toSeq)
// Get time params from the RangeSelector, and use them to identify a TargetSchemaChanges.
val startEndMsPairOpt = rs.rangeSelector match {
case is: IntervalSelector => Some((is.from, is.to))
case _ => None
}
val tsFunc = tsp.targetSchemaFunc(filters.head)
startEndMsPairOpt match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startEndMsPairOpt.map() is cleaner than pattern match that return None for None

case Some((startMs, endMs)) => findTargetSchema(tsFunc, startMs, endMs).map(_.schema)
case _ => None
}
}

// scalastyle:off method.length
/**
* Returns the shards spanned by a LogicalPlan's data.
Expand All @@ -387,18 +405,11 @@ class SingleClusterPlanner(val dataset: Dataset,
assert(filters.size == 1, s"expected leaf plan to yield single filter group, but got ${filters.size}")
leafPlan match {
case rs: RawSeries =>
// Get time params from the RangeSelector, and use them to identify a TargetSchemaChanges.
val startEndMsPairOpt = rs.rangeSelector match {
case is: IntervalSelector => Some((is.from, is.to))
case _ => None
}
val tsLabels: Option[Seq[String]] = {
val tsFunc = tsp.targetSchemaFunc(filters.head)
startEndMsPairOpt match {
case Some((startMs, endMs)) => findTargetSchema(tsFunc, startMs, endMs).map(_.schema)
case _ => None
}
}
val tsLabels: Option[Seq[String]] = getTargetSchemaLabelsFromRawSeries(rs, qContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a method? This lead to code duplication and doing map instead of pattern match will be concise as well

val (startMs, endMs): (Long, Long) = startEndMsPairOpt.getOrElse((0, Long.MaxValue))
leafInfos.append(LeafInfo(leafPlan, startMs, endMs, renameMetricFilter(filters.head), tsLabels))
// Do nothing; not pulling data from any shards.
Expand Down Expand Up @@ -435,7 +446,10 @@ class SingleClusterPlanner(val dataset: Dataset,
*/
private def getPushdownShards(qContext: QueryContext,
lp: LogicalPlan): Option[Set[Int]] = {
def helper(lp: LogicalPlan): Option[Set[Int]] = lp match {
case class PushdownData(shards: Set[Int],
shardKeys: Map[String, String],
tschemaLabels: Set[String])
def helper(lp: LogicalPlan): Option[PushdownData] = lp match {
// VectorPlans can't currently be pushed down. Consider:
// foo{...} or vector(0)
// If foo{...} is sharded with spread=1, but both shards contain no foo{...} data,
Expand All @@ -461,49 +475,61 @@ class SingleClusterPlanner(val dataset: Dataset,
case aif: ApplyInstantFunction => helper(aif.vectors)
case bj: BinaryJoin =>
// lhs/rhs must reside on the same set of shards, and target schema labels for all leaves must be
// discoverable, equal, and preserved by join keys
val lhsShards = helper(bj.lhs)
val rhsShards = helper(bj.rhs)
val canPushdown = lhsShards != None && rhsShards != None &&
// either the shard groups are equal, or either of lhs/rhs includes only scalars.
(lhsShards == rhsShards || lhsShards.get.isEmpty || rhsShards.get.isEmpty) &&
{
val targetSchemaLabels =
getUniversalTargetSchemaLabels(bj, targetSchemaProvider(qContext))
targetSchemaLabels.isDefined &&
targetSchemaLabels.get.toSet.diff(shardColumns.toSet).subsetOf(bj.on.toSet)
}
// union lhs/rhs shards, since one might be empty (if it's a scalar)
if (canPushdown) Some(lhsShards.get.union(rhsShards.get)) else None
// discoverable, equal, and preserved by join keys.
val lhsData = helper(bj.lhs)
val rhsData = helper(bj.rhs)
val canPushdown = lhsData.isDefined && rhsData.isDefined && {
val hasScalar = lhsData.get.shards.isEmpty || rhsData.get.shards.isEmpty
val canPushdownJoin = lhsData.get.shards == rhsData.get.shards &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the order of Seq containing shards granted to be ordered consistently? Seq(1, 2, 3) != Seq(3, 2, 1) though you want to push down.

Also too much going on here, some comments would really be helpful to understand the intent.

lhsData.get.shardKeys == rhsData.get.shardKeys &&
lhsData.get.tschemaLabels == rhsData.get.tschemaLabels &&
lhsData.get.tschemaLabels.diff(shardColumns.toSet).subsetOf(bj.on.toSet)
hasScalar || canPushdownJoin
}
if (canPushdown) {
// account for empty pushdown shards (i.e. lhs/rhs is a scalar)
if (lhsData.get.shards.nonEmpty) lhsData else rhsData
} else None
case agg: Aggregate =>
// target schema labels for all leaves must be discoverable, equal, and preserved by join keys
val shards = helper(agg.vectors)
val canPushdown = shards != None &&
val pushdownData = helper(agg.vectors)
val canPushdown = pushdownData.isDefined &&
agg.clauseOpt.isDefined &&
agg.clauseOpt.get.clauseType == AggregateClause.ClauseType.By &&
{
val targetSchemaLabels =
getUniversalTargetSchemaLabels(agg, targetSchemaProvider(qContext))
targetSchemaLabels.isDefined &&
{
val byLabels = agg.clauseOpt.get.labels
targetSchemaLabels.get.toSet.diff(shardColumns.toSet).subsetOf(byLabels.toSet)
}
pushdownData.get.tschemaLabels.diff(shardColumns.toSet).subsetOf(byLabels.toSet)
}
if (canPushdown) shards else None
if (canPushdown) pushdownData else None
case nl: NonLeafLogicalPlan =>
// Pessimistically require that this entire subtree lies on one shard (or none, if all scalars).
val shardGroups = nl.children.map(helper(_))
if (shardGroups.forall(_.isDefined)) {
val shards = shardGroups.flatMap(_.get).toSet
// if zero elements, then all children are scalars
if (shards.size <= 1) Some(shards) else None
val pushdownDatas = nl.children.map(helper(_))
if (pushdownDatas.nonEmpty && pushdownDatas.forall(_.isDefined)) {
val referenceData = pushdownDatas.map(_.get)
.find(data => data.shards.nonEmpty)
.getOrElse(pushdownDatas.head.get)
val allDatasEqual = pushdownDatas.tail
.forall(data => data.get.shards.isEmpty || data.get == referenceData)
if (allDatasEqual) Some(referenceData) else None
} else None
case rs: RawSeries =>
val shards = qContext.plannerParams.shardOverrides.map(_.toSet)
.orElse(getShardSpanFromLp(rs, qContext))
val tschemaLabels = getTargetSchemaLabelsFromRawSeries(rs, qContext)
val shardKeyCols = dataset.options.nonMetricShardColumns
val shardKeys = getNonMetricShardKeyFilters(rs, shardKeyCols)
.flatten
.map(filter => filter.column -> filter.filter.valuesStrings.head.asInstanceOf[String])
.toMap
if (tschemaLabels.isDefined) {
Some(PushdownData(shards.get, shardKeys, tschemaLabels.map(_.toSet).getOrElse(Set.empty)))
} else None
case rs: RawSeries => getShardSpanFromLp(rs, qContext)
case sc: ScalarPlan => Some(Set.empty) // don't want a None to end shard-group propagation
// Don't want a Scalar to end PushdownData propagation, so we'll use
// Set.empty to signify propagation should continue.
case sc: ScalarPlan => Some(PushdownData(Set.empty, Map.empty, Set.empty))
case _ => None
}
helper(lp)
helper(lp).map(_.shards)
}
// scalastyle:on method.length
// scalastyle:on cyclomatic.complexity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,9 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
binaryJoinNode.asInstanceOf[BinaryJoinExec].rhs.size shouldEqual 4
}

it ("should pushdown BinaryJoins between different shard keys when shards are identical") {
it ("should not pushdown BinaryJoins between different shard keys") {
def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = {
// Spread across all shards.
Seq(SpreadChange(0, 5))
}
def targetSchema(filter: Seq[ColumnFilter]): Seq[TargetSchemaChange] = {
Expand All @@ -520,8 +521,7 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
spreadOverride = Some(FunctionalSpreadProvider(spread)),
targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)),
queryTimeoutMillis = 1000000)))
execPlan.isInstanceOf[LocalPartitionDistConcatExec] shouldEqual true
execPlan.children.forall(c => c.isInstanceOf[BinaryJoinExec])
execPlan.isInstanceOf[LocalPartitionDistConcatExec] shouldEqual false
}
}

Expand Down