Skip to content

Commit

Permalink
Adding new helper method to get columnfilter list along with leaf plan
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Apr 23, 2024
1 parent 7fa0f0c commit e7b6806
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import filodb.core.query.ColumnFilter
import filodb.core.query.Filter.Equals
import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.LogicalPlan
import filodb.query.{LogicalPlan, ScalarFixedDoublePlan}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -58,4 +58,18 @@ class LogicalPlanUtilsSpec extends AnyFunSpec with Matchers {
}
getResult(differentCols) shouldEqual None
}

it ("getColumnFilterGroupWithLogical plans should return result as expected") {
val timeParamsSec = TimeStepParams(1000, 10, 10000)
val query1 = """sum(count_over_time((test_metric{_ws_="test-ws", _ns_="test-ns", usecase="test"} > 20000)[21600s:])) or vector(0)"""
val lp = Parser.queryRangeToLogicalPlan(query1, timeParamsSec)
val columnGroupWithPlan = LogicalPlan.getColumnFilterGroupWithLogicalPlan(lp)
val scalarPlansCount = columnGroupWithPlan.count(x => x.logicalPlan == ScalarFixedDoublePlan)
scalarPlansCount shouldEqual 2
val query2 = """rate(test_metric{_ns_="test-ns", _ws_="test-ns", cluster="test1"}[5m]) * 1000"""
val lp2 = Parser.queryRangeToLogicalPlan(query2, timeParamsSec)
val columnGroupWithPlan2 = LogicalPlan.getColumnFilterGroupWithLogicalPlan(lp2)
val scalarPlansCount2 = columnGroupWithPlan2.count(x => x.logicalPlan == ScalarFixedDoublePlan)
scalarPlansCount2 shouldEqual 1
}
}
31 changes: 31 additions & 0 deletions query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan,
vectors = vectors.replacePeriodicSeriesFilters(filters))
}

case class ColumnFilterListWithLogicalPlan(columnFilterSet: Set[ColumnFilter], logicalPlan: Any)

object LogicalPlan {
/**
* Get leaf Logical Plans
Expand Down Expand Up @@ -761,6 +763,35 @@ object LogicalPlan {
}
}

/**
* Given a LogicalPlan, the function finds a Seq of all Child nodes, and returns a Set of ColumnFilters and the
* associated logical plan for each of the Leaf node
*
* @param logicalPlan the root LogicalPlan
* @return Seq[ColumnFilterListWithLogicalPlan], Seq has size same as the number of leaf nodes
*/
def getColumnFilterGroupWithLogicalPlan(logicalPlan: LogicalPlan): Seq[ColumnFilterListWithLogicalPlan] = {
LogicalPlan.findLeafLogicalPlans(logicalPlan) map { lp =>
lp match {
case lp: LabelValues => ColumnFilterListWithLogicalPlan(lp.filters toSet, LabelValues)
case lp: LabelNames => ColumnFilterListWithLogicalPlan(lp.filters toSet, LabelNames)
case lp: RawSeries => ColumnFilterListWithLogicalPlan(lp.filters toSet, RawSeries)
case lp: RawChunkMeta => ColumnFilterListWithLogicalPlan(lp.filters toSet, RawChunkMeta)
case lp: SeriesKeysByFilters => ColumnFilterListWithLogicalPlan(lp.filters toSet, SeriesKeysByFilters)
case lp: LabelCardinality => ColumnFilterListWithLogicalPlan(lp.filters.toSet, LabelCardinality)
case lp: TsCardinalities => ColumnFilterListWithLogicalPlan(lp.filters.toSet, TsCardinalities)
case _: ScalarTimeBasedPlan => ColumnFilterListWithLogicalPlan(Set.empty[ColumnFilter], ScalarTimeBasedPlan)
case _: ScalarFixedDoublePlan => ColumnFilterListWithLogicalPlan(Set.empty[ColumnFilter], ScalarFixedDoublePlan)
case _: ScalarBinaryOperation => ColumnFilterListWithLogicalPlan(Set.empty[ColumnFilter], ScalarBinaryOperation)
case _ => throw new BadQueryException(s"Invalid logical plan $logicalPlan")
}
} match {
case groupSeq: Seq[ColumnFilterListWithLogicalPlan] =>
if (groupSeq.isEmpty || groupSeq.forall( x => x.columnFilterSet.isEmpty)) Seq.empty else groupSeq
case _ => Seq.empty
}
}

def getRawSeriesFilters(logicalPlan: LogicalPlan): Seq[Seq[ColumnFilter]] = {
LogicalPlan.findLeafLogicalPlans(logicalPlan).map { l =>
l match {
Expand Down

0 comments on commit e7b6806

Please sign in to comment.