-
Notifications
You must be signed in to change notification settings - Fork 224
/
MultiSchemaPartitionsExec.scala
161 lines (138 loc) · 8.39 KB
/
MultiSchemaPartitionsExec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package filodb.query.exec
import kamon.Kamon
import monix.execution.Scheduler
import filodb.core.DatasetRef
import filodb.core.metadata.Schemas
import filodb.core.query.{ColumnFilter, QueryConfig, QueryContext, QuerySession, QueryWarnings}
import filodb.core.query.Filter.Equals
import filodb.core.store._
import filodb.query.Query.qLogger
final case class UnknownSchemaQueryErr(id: Int) extends
Exception(s"Unknown schema ID $id during query. This likely means a schema config change happened and " +
"the partitionkeys tables were not truncated.")
/**
* ExecPlan to select raw data from partitions that the given filter resolves to,
* in the given shard, for the given row key range. Schema-agnostic - discovers the schema and
* creates an inner SelectRawPartitionsExec with the right column IDs and other details depending on schema.
* The inner SelectRawPartitionsExec may have modified transformers based on the discovered schema -
* ex. some RangeFunctions are transformed for downsample gauge schemas.
*
* @param schema an optional schema to filter partitions using. If not supplied, schema is discovered.
* @param colName optional column name to select for querying. If not supplied, the default valueColumn from
* data schema definition is used. For downsampled gauges, column is automatically chosen.
*/
final case class MultiSchemaPartitionsExec(queryContext: QueryContext,
dispatcher: PlanDispatcher,
dataset: DatasetRef,
shard: Int,
filters: Seq[ColumnFilter],
chunkMethod: ChunkScanMethod,
metricColumn: String,
schema: Option[String] = None,
colName: Option[String] = None) extends LeafExecPlan {
import SelectRawPartitionsExec._
override def allTransformers: Seq[RangeVectorTransformer] = finalPlan.rangeVectorTransformers
@transient // dont serialize the SelectRawPartitionsExec plan created for plan execution
var finalPlan: SelectRawPartitionsExec = _
// Remove _columnName suffix from metricName and generate PartLookupResult
private def removeSuffixAndGenerateLookupResult(filters: Seq[ColumnFilter], metricName: String, columnName: String,
source: ChunkSource,
querySession: QuerySession) = {
// Assume metric name has only equal filter
val filterWithoutColumn = filters.filterNot(_.column == metricColumn) :+
ColumnFilter(metricColumn, Equals(metricName.stripSuffix(s"_$columnName")))
val partMethod = FilteredPartitionScan(ShardSplit(shard), filterWithoutColumn)
// clear stats since previous call to lookupPartitions set the stat with metric name that has suffix
querySession.queryStats.clear()
val lookupRes = source.lookupPartitions(dataset, partMethod, chunkMethod, querySession)
(lookupRes, Some(columnName))
}
// scalastyle:off method.length
private def finalizePlan(source: ChunkSource,
querySession: QuerySession): SelectRawPartitionsExec = {
val partMethod = FilteredPartitionScan(ShardSplit(shard), filters)
Kamon.currentSpan().mark("filtered-partition-scan")
var lookupRes = source.lookupPartitions(dataset, partMethod, chunkMethod, querySession)
val metricName = filters.find(_.column == metricColumn).map(_.filter.valuesStrings.head.toString)
var newColName = colName
/*
* As part of Histogram query compatibility with Prometheus format histograms, we
* remove _sum, _count suffix from metric name here. _bucket & le are already
* removed in SingleClusterPlanner. We remove the suffix only when partition lookup does not return any results
*/
if (lookupRes.firstSchemaId.isEmpty && querySession.queryConfig.translatePromToFilodbHistogram &&
colName.isEmpty && metricName.isDefined) {
val res = if (metricName.get.endsWith("_sum"))
removeSuffixAndGenerateLookupResult(filters, metricName.get, "sum", source, querySession)
else if (metricName.get.endsWith("_count"))
removeSuffixAndGenerateLookupResult(filters, metricName.get, "count", source, querySession)
else (lookupRes, newColName)
lookupRes = res._1
newColName = res._2
}
Kamon.currentSpan().mark("lookup-partitions-done")
queryContext.checkQueryTimeout(this.getClass.getName)
// Find the schema if one wasn't supplied
val schemas = source.schemas(dataset).get
// If we cannot find a schema, or none is provided, we cannot move ahead with specific SRPE planning
schema.map { s => schemas.schemas(s) }
.orElse(lookupRes.firstSchemaId.map(schemas.apply))
.map { sch =>
// There should not be any unknown schemas at all, as they are filtered out during ingestion and
// in bootstrapPartKeys(). This might happen after schema changes if old partkeys are not truncated.
if (sch == Schemas.UnknownSchema) throw UnknownSchemaQueryErr(lookupRes.firstSchemaId.getOrElse(-1))
// Get exact column IDs needed, including max column as needed for histogram calculations.
// This code is responsible for putting exact IDs needed by any range functions.
val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers)
val colIDs = addIDsForHistMax(sch, colIDs1)
// Modify transformers as needed for histogram w/ max, downsample, other schemas
val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers)
val newxformers = newXFormersForHistMax(sch, colIDs, newxformers1)
val newPlan = SelectRawPartitionsExec(queryContext, dispatcher, dataset,
Some(sch), Some(lookupRes),
schema.isDefined, colIDs, planId)
qLogger.debug(s"Discovered schema ${sch.name} and created inner plan $newPlan")
newxformers.foreach { xf => newPlan.addRangeVectorTransformer(xf) }
newPlan
}.getOrElse {
qLogger.debug(s"No time series found for filters $filters... employing empty plan")
SelectRawPartitionsExec(queryContext, dispatcher, dataset,
None, Some(lookupRes),
schema.isDefined, Nil, planId)
}
}
// scalastyle:on method.length
def doExecute(source: ChunkSource,
querySession: QuerySession)
(implicit sched: Scheduler): ExecResult = {
source.checkReadyForQuery(dataset, shard, querySession)
source.acquireSharedLock(dataset, shard, querySession)
finalPlan = finalizePlan(source, querySession)
finalPlan.doExecute(source, querySession)(sched)
}
protected def args: String = s"dataset=$dataset, shard=$shard, " +
s"chunkMethod=$chunkMethod, filters=$filters, colName=$colName, schema=$schema"
// Print inner node's details for debugging
override def curNodeText(level: Int): String = {
val innerText = Option(finalPlan).map(e => s"Inner: + ${e.curNodeText(level + 1)}\n").getOrElse("")
s"${super.curNodeText(level)} $innerText".trim
}
override protected def printRangeVectorTransformersForLevel(level: Int = 0) = {
Option(finalPlan).getOrElse(this).rangeVectorTransformers.reverse.zipWithIndex.map { case (t, i) =>
s"${"-" * (level + i)}T~${t.getClass.getSimpleName}(${t.args})" +
printFunctionArgument(t, level + i + 1).mkString("\n")
}
}
override def checkResultBytes(resultSize: Long, queryConfig: QueryConfig, queryWarnings: QueryWarnings): Unit = {
super.checkResultBytes(resultSize, queryConfig, queryWarnings)
finalPlan.lookupRes.foreach(plr =>
if (plr.dataBytesScannedCtr.get() > queryContext.plannerParams.warnLimits.rawScannedBytes) {
queryWarnings.updateRawScannedBytes(plr.dataBytesScannedCtr.get())
val msg =
s"Query scanned ${plr.dataBytesScannedCtr.get()} bytes, which exceeds a max warn limit of " +
s"${queryContext.plannerParams.warnLimits.rawScannedBytes} bytes allowed to be scanned per shard. "
qLogger.info(queryContext.getQueryLogLine(msg))
}
)
}
}