Skip to content

Commit

Permalink
fix(query) Long lookback queries with or vector() operations fail to …
Browse files Browse the repository at this point in the history
…materialize (#1547)
  • Loading branch information
amolnayak311 committed Apr 1, 2023
1 parent 59ffa04 commit 4920c0d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,13 @@ import filodb.query.exec._
// the queestion is only from which cluster: raw or downsample to pull potentially partial data
// how does copyLogicalPlan with updated end changes the meanings of these queries entirely
// Why not get rid of this else all together and just send original logical plan to downsample cluster?

// latestDownsampleTimestampFn + offsetMillis.min gives time in milliseconds and some of our plans like
// ScalarFixedDoublePlan accept times in seconds. Thus cases like sum(rate(foo{}[longtime])) or vector(0)
// this mismatch in end time for LHS and RHS causes the Binary join object creation to fail and even plan
// materialize to fail.
copyLogicalPlanWithUpdatedTimeRange(periodicSeriesPlan,
TimeRange(periodicSeriesPlan.startMs, latestDownsampleTimestampFn + offsetMillis.min))
TimeRange(periodicSeriesPlan.startMs, (latestDownsampleTimestampFn + offsetMillis.min) / 1000 * 1000))
}
logger.debug("materializing against downsample cluster:: {}", qContext.origQueryParams)
downsampleClusterPlanner.materialize(downsampleLp, qContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1530,4 +1530,45 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
validatePlan(ep, expectedPlan)

}
}

it("Materializing Binary join with or vector(0) with long look back should not throw error") {
val now = 1634777330123L
val start = now / 1000 - 8.days.toSeconds
val step = 1.minute.toSeconds
val end = now / 1000 - 2.minutes.toSeconds

val rawRetention = 7.days // 7 days
val downsampleRetention = 183.days
val earliestRawTime = now - rawRetention.toMillis
val earliestDownSampleTime = now - downsampleRetention.toMillis
// making sure that the latestDownsampleTime is not a multiple of 1000
val latestDownsampleTime = now - 6.hours.toMillis + 123

val query = """sum(rate(bar{job= "app"}[30d])) or vector(0)"""

val logicalPlan = Parser.queryRangeToLogicalPlan(query,
TimeStepParams(start, step, end))
.asInstanceOf[PeriodicSeriesPlan]

val rawPlanner = new SingleClusterPlanner(dataset, schemas, mapperRef,
earliestRetainedTimestampFn = earliestRawTime, queryConfig, "raw")
val downsamplePlanner = new SingleClusterPlanner(dataset, schemas, mapperRef,
earliestRetainedTimestampFn = earliestDownSampleTime, queryConfig, "downsample")
val longTermPlanner = new LongTimeRangePlanner(rawPlanner, downsamplePlanner,
earliestRawTime, latestDownsampleTime, disp, queryConfig, dataset)

val ep = longTermPlanner.materialize(logicalPlan, QueryContext(origQueryParams = promQlQueryParams))

val expected = """T~InstantVectorFunctionMapper(function=OrVectorDouble)
|-FA1~StaticFuncArgs(0.0,RangeParams(1634086130,60,1634755730))
|-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634086130,60,1634755730))
|--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1823110189],downsample)
|---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|----T~PeriodicSamplesMapper(start=1634086130000, step=60000, end=1634755730000, window=Some(2592000000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(1631494130000,1634755730000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1823110189],downsample)
|---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|----T~PeriodicSamplesMapper(start=1634086130000, step=60000, end=1634755730000, window=Some(2592000000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(1631494130000,1634755730000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1823110189],downsample)""".stripMargin
validatePlan(ep, expected)
}
}

0 comments on commit 4920c0d

Please sign in to comment.