Skip to content

Commit

Permalink
feat(query): Adding quota parameters for individual query limits and …
Browse files Browse the repository at this point in the history
…further reporting. (#1548)

* Adding quota parameters for individual query quotas and further reporting.

* addressing changes requested

* addressing comments, adding number of scanned time series property

---------

Co-authored-by: Kier Petrov <kpetrov@apple.com>
  • Loading branch information
kvpetrov and Kier Petrov committed Apr 7, 2023
1 parent 4920c0d commit c8ab83f
Show file tree
Hide file tree
Showing 24 changed files with 462 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
def query(memStore: TimeSeriesMemStore): Future[QueryResponse] = {
val colFilters = seriesTags.map { case (t, v) => ColumnFilter(t.toString, Equals(v.toString)) }.toSeq
val queryFilters = colFilters :+ ColumnFilter("_metric_", Equals(gaugeName))
val exec = MultiSchemaPartitionsExec(QueryContext(plannerParams = PlannerParams(sampleLimit = numSamples * 2)),
val exec = MultiSchemaPartitionsExec(
QueryContext(plannerParams = PlannerParams(enforcedLimits = PerQueryLimits(execPlanSamples = numSamples * 2))),
InProcessPlanDispatcher(QueryConfig.unitTestingQueryConfig), dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 *
numSamples), "_metric_")
val queryConfig = QueryConfig(config.getConfig("query"))
Expand Down
6 changes: 4 additions & 2 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,10 @@ object CliMain extends StrictLogging {
val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s)))

val qOpts = QueryContext(origQueryParams = tsdbQueryParams,
plannerParams = PlannerParams(applicationId = "filodb-cli", spreadOverride = spreadProvider,
sampleLimit = options.sampleLimit, queryTimeoutMillis = options.timeout.toMillis.toInt,
plannerParams = PlannerParams(
applicationId = "filodb-cli", spreadOverride = spreadProvider,
enforcedLimits = PerQueryLimits(execPlanSamples = options.sampleLimit),
queryTimeoutMillis = options.timeout.toMillis.toInt,
shardOverrides = options.shardOverrides))
println(s"Sending query command to server for $ref with options $qOpts...")
println(s"Query Plan:\n$plan")
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

38 changes: 33 additions & 5 deletions core/src/main/scala/filodb.core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,54 @@ case class PromQlQueryParams(promQl: String, startSecs: Long, stepSecs: Long, en

case object UnavailablePromQlQueryParams extends TsdbQueryParams

case class PerQueryLimits(
execPlanSamples: Int = 1000000, // Limit on ExecPlan results in samples, default is 100K
execPlanResultBytes: Long = 18000000, // Limit on ExecPlan results in bytes, default is 18MB
groupByCardinality: Int = 100000, // Limit on "group by" clause results, default is 100K
joinQueryCardinality: Int = 100000, // Limit on binary join results, default is 100K
timeSeriesSamplesScannedBytes: Long = 300000000, // Limit on max data scanned per shard, default is 300 MB
timeSeriesScanned: Int = 1000000) // Limit on max number of time series scanned, default is 1M

object PerQueryLimits {

def defaultEnforcedLimits(): PerQueryLimits = {
PerQueryLimits()
}

def defaultWarnLimits(): PerQueryLimits = {
PerQueryLimits(
execPlanSamples = 50000,
execPlanResultBytes = 15000000,
groupByCardinality = 50000,
joinQueryCardinality = 50000,
timeSeriesSamplesScannedBytes = 150000,
timeSeriesScanned = 500000
)
}

}
case class PlannerParams(applicationId: String = "filodb",
spread: Option[Int] = None,
spreadOverride: Option[SpreadProvider] = None,
shardOverrides: Option[Seq[Int]] = None,
targetSchemaProviderOverride: Option[TargetSchemaProvider] = None,
queryTimeoutMillis: Int = 60000, // set default to match default http-request-timeout
sampleLimit: Int = 1000000,
groupByCardLimit: Int = 100000,
joinQueryCardLimit: Int = 100000,
resultByteLimit: Long = 18000000, // 18MB
enforcedLimits: PerQueryLimits = PerQueryLimits.defaultEnforcedLimits(),
warnLimits: PerQueryLimits = PerQueryLimits.defaultWarnLimits(),
queryOrigin: Option[String] = None, // alert/dashboard/rr/api/etc
queryOriginId: Option[String] = None, // an ID of rr/alert
queryPrincipal: Option[String] = None, // user, entity initiating query
timeSplitEnabled: Boolean = false,
minTimeRangeForSplitMs: Long = 1.day.toMillis,
splitSizeMs: Long = 1.day.toMillis,
skipAggregatePresent: Boolean = false,
processFailure: Boolean = true,
processMultiPartition: Boolean = false,
allowPartialResults: Boolean = false)

object PlannerParams {
def apply(constSpread: Option[SpreadProvider], sampleLimit: Int): PlannerParams =
PlannerParams(spreadOverride = constSpread, sampleLimit = sampleLimit)
PlannerParams(spreadOverride = constSpread, enforcedLimits = PerQueryLimits(execPlanSamples = sampleLimit))
def apply(constSpread: Option[SpreadProvider], partialResults: Boolean): PlannerParams =
PlannerParams(spreadOverride = constSpread, allowPartialResults = partialResults)
}
Expand Down
34 changes: 22 additions & 12 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,31 @@ message QueryParams {
optional string remoteQueryPath = 8;
}

message PerQueryLimits {
optional uint32 execPlanSamples = 1; // Limit on ExecPlan results in samples
optional uint64 execPlanResultBytes = 2; // Limit on ExecPlan results in bytes
optional uint32 groupByCardinality = 3; // Limit on "group by" clause results
optional uint32 joinQueryCardinality = 4; // Limit on binary join results
optional uint64 timeSeriesSamplesScannedBytes = 5; // Limit on max data scanned per shard
optional uint32 timeSeriesScanned = 6; // Limit on max time series scanned
}

message PlannerParams {
optional string applicationId = 1;
optional uint32 queryTimeoutMillis = 2;
optional uint32 sampleLimit = 3;
optional uint32 groupByCardLimit = 4;
optional uint32 joinQueryCardLimit = 5;
optional uint64 resultByteLimit = 6;
optional bool timeSplitEnabled = 7;
optional uint64 minTimeRangeForSplitMs = 8;
optional uint64 splitSizeMs = 9;
optional bool skipAggregatePresent = 10;
optional bool processFailure = 11;
optional bool processMultiPartition = 12;
optional bool allowPartialResults = 13;
optional bool histogramMap = 14;
optional PerQueryLimits enforcedLimits = 3;
optional PerQueryLimits warnLimits = 4;
optional string queryOrigin = 5;
optional string queryOriginId = 6;
optional string queryPrincipal = 7;
optional bool timeSplitEnabled = 8;
optional uint64 minTimeRangeForSplitMs = 9;
optional uint64 splitSizeMs = 10;
optional bool skipAggregatePresent = 11;
optional bool processFailure = 12;
optional bool processMultiPartition = 13;
optional bool allowPartialResults = 14;
optional bool histogramMap = 15;
}

message Request {
Expand Down
4 changes: 2 additions & 2 deletions jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import filodb.core.{MachineMetricsData, MetricsTestData, SpreadChange, TestData}
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.memstore._
import filodb.core.metadata.Schemas
import filodb.core.query.{PlannerParams, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.{PerQueryLimits, PlannerParams, QueryConfig, QueryContext, QuerySession}
import filodb.core.store._
import filodb.memory.MemFactory
import filodb.memory.format.SeqRowReader
Expand Down Expand Up @@ -91,7 +91,7 @@ class HistogramQueryBenchmark {

// Single-threaded query test
val numQueries = 500
QueryContext(plannerParams= PlannerParams(sampleLimit = 1000))
QueryContext(plannerParams = PlannerParams(enforcedLimits = PerQueryLimits(execPlanSamples = 1000)))
QueryContext(plannerParams = PlannerParams(shardOverrides = Some(Seq(0))))

val qContext = QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 100).
Expand Down
4 changes: 2 additions & 2 deletions jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import filodb.core.SpreadChange
import filodb.core.binaryrecord2.RecordContainer
import filodb.core.memstore.{SomeData, TimeSeriesMemStore}
import filodb.core.metadata.Schemas
import filodb.core.query.{PlannerParams, QueryContext}
import filodb.core.query.{PerQueryLimits, PlannerParams, QueryContext}
import filodb.core.store.StoreConfig
import filodb.gateway.GatewayServer
import filodb.gateway.conversion.PrometheusInputRecord
Expand Down Expand Up @@ -122,7 +122,7 @@ class QueryAndIngestBenchmark extends StrictLogging {
println(s"Initial ingestion ended, indexes set up")
val qContext = QueryContext(plannerParams =
new PlannerParams(spreadOverride = Some(StaticSpreadProvider(SpreadChange(0, spread))),
sampleLimit = 1000000,
enforcedLimits = PerQueryLimits(execPlanSamples = 1000000),
queryTimeoutMillis = 2.hours.toMillis.toInt)) // high timeout since we are using same context for all queries

/**
Expand Down
4 changes: 2 additions & 2 deletions jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import filodb.core.SpreadChange
import filodb.core.binaryrecord2.RecordContainer
import filodb.core.memstore.{SomeData, TimeSeriesMemStore}
import filodb.core.metadata.Schemas
import filodb.core.query.{PlannerParams, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.{PerQueryLimits, PlannerParams, QueryConfig, QueryContext, QuerySession}
import filodb.core.store.StoreConfig
import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
Expand Down Expand Up @@ -194,7 +194,7 @@ class QueryInMemoryBenchmark extends StrictLogging {
// Single-threaded query test
val qContext = QueryContext(plannerParams =
new PlannerParams(spreadOverride = Some(StaticSpreadProvider(SpreadChange(0, spread))),
sampleLimit = 10000,
enforcedLimits = PerQueryLimits(execPlanSamples = 10000),
queryTimeoutMillis = 2.hours.toMillis.toInt)) // high timeout since we are using same context for all queries
val logicalPlan = Parser.queryRangeToLogicalPlan(rawQuery, qParams)
// Pick the children nodes, not the LocalPartitionDistConcatExec. Thus we can run in a single thread this way
Expand Down
77 changes: 69 additions & 8 deletions query/src/main/scala/filodb/query/ProtoConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,16 @@ object ProtoConverters {

implicit class PlannerParamsToProtoConverter(pp: PlannerParams) {
def toProto: GrpcMultiPartitionQueryService.PlannerParams = {
val enforcedLimits = pp.enforcedLimits.toProto
val warnLimits = pp.warnLimits.toProto
val builder = GrpcMultiPartitionQueryService.PlannerParams.newBuilder()
builder.setApplicationId(pp.applicationId)
builder.setQueryTimeoutMillis(pp.queryTimeoutMillis)
builder.setSampleLimit(pp.sampleLimit)
builder.setGroupByCardLimit(pp.groupByCardLimit)
builder.setJoinQueryCardLimit(pp.joinQueryCardLimit)
builder.setResultByteLimit(pp.resultByteLimit)
builder.setEnforcedLimits(enforcedLimits)
builder.setWarnLimits(warnLimits)
pp.queryOrigin.foreach(qo => builder.setQueryOrigin(qo))
pp.queryOriginId.foreach(qoi => builder.setQueryOriginId(qoi))
pp.queryPrincipal.foreach(qp => builder.setQueryPrincipal(qp))
builder.setTimeSplitEnabled(pp.timeSplitEnabled)
builder.setMinTimeRangeForSplitMs(pp.minTimeRangeForSplitMs)
builder.setSplitSizeMs(pp.splitSizeMs)
Expand All @@ -211,14 +214,18 @@ object ProtoConverters {

implicit class PlannerParamsFromProtoConverter(gpp: GrpcMultiPartitionQueryService.PlannerParams) {
def fromProto: PlannerParams = {
val enforcedLimits = gpp.getEnforcedLimits.fromProto(PerQueryLimits.defaultEnforcedLimits())
val warnLimits = gpp.getWarnLimits.fromProto(PerQueryLimits.defaultWarnLimits())
val pp = PlannerParams()

pp.copy(
applicationId = if (gpp.hasApplicationId) gpp.getApplicationId else pp.applicationId,
queryTimeoutMillis = if (gpp.hasQueryTimeoutMillis) gpp.getQueryTimeoutMillis else pp.queryTimeoutMillis,
sampleLimit = if (gpp.hasSampleLimit) gpp.getSampleLimit else pp.sampleLimit,
groupByCardLimit = if (gpp.hasGroupByCardLimit) gpp.getGroupByCardLimit else pp.groupByCardLimit,
joinQueryCardLimit = if (gpp.hasJoinQueryCardLimit) gpp.getJoinQueryCardLimit else pp.joinQueryCardLimit,
resultByteLimit = if (gpp.hasResultByteLimit) gpp.getResultByteLimit else pp.resultByteLimit,
enforcedLimits = enforcedLimits,
warnLimits = warnLimits,
queryOrigin = if (gpp.hasQueryOrigin) Option(gpp.getQueryOrigin) else None,
queryOriginId = if (gpp.hasQueryOriginId) Option(gpp.getQueryOriginId) else None,
queryPrincipal = if (gpp.hasQueryPrincipal) Option(gpp.getQueryPrincipal) else None,
timeSplitEnabled = if (gpp.hasTimeSplitEnabled) gpp.getTimeSplitEnabled else pp.timeSplitEnabled,
minTimeRangeForSplitMs = if (gpp.hasMinTimeRangeForSplitMs) gpp.getMinTimeRangeForSplitMs
else pp.minTimeRangeForSplitMs,
Expand All @@ -233,6 +240,60 @@ object ProtoConverters {
}
}

implicit class PerQueryLimitsToProtoConverter(sq: PerQueryLimits) {
def toProto: GrpcMultiPartitionQueryService.PerQueryLimits = {
val quotaBuilder = GrpcMultiPartitionQueryService.PerQueryLimits.newBuilder()
quotaBuilder.setExecPlanSamples(sq.execPlanSamples)
quotaBuilder.setExecPlanResultBytes(sq.execPlanResultBytes)
quotaBuilder.setGroupByCardinality(sq.groupByCardinality)
quotaBuilder.setJoinQueryCardinality(sq.joinQueryCardinality)
quotaBuilder.setTimeSeriesSamplesScannedBytes(sq.timeSeriesSamplesScannedBytes)
quotaBuilder.setTimeSeriesScanned(sq.timeSeriesScanned)
quotaBuilder.build()
}
}
implicit class PerQueryLimitsFromProtoConverter(giq: GrpcMultiPartitionQueryService.PerQueryLimits) {
def fromProto(): PerQueryLimits = {
val q = PerQueryLimits()
fromProto(q)
}
def fromProto(defaultQ: PerQueryLimits): PerQueryLimits = {
val limits = defaultQ.copy(
execPlanSamples =
if (giq.hasExecPlanSamples)
giq.getExecPlanSamples
else
defaultQ.execPlanSamples,
execPlanResultBytes =
if (giq.hasExecPlanResultBytes)
giq.getExecPlanResultBytes
else
defaultQ.execPlanResultBytes,
groupByCardinality =
if (giq.hasGroupByCardinality)
giq.getGroupByCardinality
else
defaultQ.groupByCardinality,
joinQueryCardinality =
if (giq.hasJoinQueryCardinality)
giq.getJoinQueryCardinality
else
defaultQ.joinQueryCardinality,
timeSeriesSamplesScannedBytes =
if (giq.hasTimeSeriesSamplesScannedBytes)
giq.getTimeSeriesSamplesScannedBytes
else
defaultQ.timeSeriesSamplesScannedBytes,
timeSeriesScanned =
if (giq.hasTimeSeriesScanned)
giq.getTimeSeriesScanned
else
defaultQ.timeSeriesScanned
)
limits
}
}

implicit class StatToProtoConverter(stat: Stat) {
def toProto: GrpcMultiPartitionQueryService.Stat = {
val builder = GrpcMultiPartitionQueryService.Stat.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait ReduceAggregateExec extends NonLeafExecPlan {
else {
val aggregator = RowAggregator(aggrOp, aggrParams, schema)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key,
querySession.qContext.plannerParams.groupByCardLimit, queryContext)
querySession.qContext.plannerParams.enforcedLimits.groupByCardinality, queryContext)
}
}
Observable.fromTask(task).flatten
Expand Down Expand Up @@ -140,11 +140,11 @@ final case class AggregateMapReduce(aggrOp: AggregationOperator,
RangeVectorAggregator.fastReduce(aggregator, false, source, numWindows)
}.getOrElse {
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping,
querySession.qContext.plannerParams.groupByCardLimit, querySession.qContext)
querySession.qContext.plannerParams.enforcedLimits.groupByCardinality, querySession.qContext)
}
} else {
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping,
querySession.qContext.plannerParams.groupByCardLimit, querySession.qContext)
querySession.qContext.plannerParams.enforcedLimits.groupByCardinality, querySession.qContext)
}
}

Expand Down
13 changes: 8 additions & 5 deletions query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ final case class BinaryJoinExec(queryContext: QueryContext,
val span = Kamon.currentSpan()
val taskOfResults = childResponses.map {
case (QueryResult(_, _, result, _, _, _), _)
if (result.size > queryContext.plannerParams.joinQueryCardLimit && cardinality == Cardinality.OneToOne) =>
throw new BadQueryException(s"The join in this query has input cardinality of ${result.size} which" +
s" is more than limit of ${queryContext.plannerParams.joinQueryCardLimit}." +
if (
result.size > queryContext.plannerParams.enforcedLimits.joinQueryCardinality &&
cardinality == Cardinality.OneToOne
) => throw new BadQueryException(s"The join in this query has input cardinality of ${result.size} which" +
s" is more than limit of ${queryContext.plannerParams.enforcedLimits.joinQueryCardinality}." +
s" Try applying more filters or reduce time range.")
case (QueryResult(_, _, result, _, _, _), i) => (result, i)
}.toListL.map { resp =>
Expand Down Expand Up @@ -136,9 +138,10 @@ final case class BinaryJoinExec(queryContext: QueryContext,
}

// OneToOne cardinality case is already handled. this condition handles OneToMany case
if (results.size >= queryContext.plannerParams.joinQueryCardLimit)
if (results.size >= queryContext.plannerParams.enforcedLimits.joinQueryCardinality)
throw new BadQueryException(s"The result of this join query has cardinality ${results.size} and has " +
s"reached the limit of ${queryContext.plannerParams.joinQueryCardLimit}. Try applying more filters.")
s"reached the limit of ${queryContext.plannerParams.enforcedLimits.joinQueryCardinality}. " +
s"Try applying more filters.")

val res = if (lhsIsOneSide) binOp(rvOne.rows, rvOtherCorrect.rows)
else binOp(rvOtherCorrect.rows, rvOne.rows)
Expand Down

0 comments on commit c8ab83f

Please sign in to comment.