-
Notifications
You must be signed in to change notification settings - Fork 224
/
LogicalPlanUtils.scala
145 lines (129 loc) · 6.73 KB
/
LogicalPlanUtils.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package filodb.coordinator.queryplanner
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
import filodb.query._
object LogicalPlanUtils {
/**
* Check whether all child logical plans have same start and end time
*/
def hasSingleTimeRange(logicalPlan: LogicalPlan): Boolean = {
logicalPlan match {
case binaryJoin: BinaryJoin =>
val lhsTime = getPeriodicSeriesTimeFromLogicalPlan(binaryJoin.lhs)
val rhsTime = getPeriodicSeriesTimeFromLogicalPlan(binaryJoin.rhs)
(lhsTime.startMs == rhsTime.startMs) && (lhsTime.endMs == rhsTime.endMs)
case _ => true
}
}
/**
* Retrieve start and end time from LogicalPlan
* NOTE: Plan should be PeriodicSeriesPlan
*/
def getPeriodicSeriesTimeFromLogicalPlan(logicalPlan: LogicalPlan): TimeRange = {
logicalPlan match {
case lp: PeriodicSeries => TimeRange(lp.startMs, lp.endMs)
case lp: PeriodicSeriesWithWindowing => TimeRange(lp.startMs, lp.endMs)
case lp: ApplyInstantFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: Aggregate => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: BinaryJoin => // can assume lhs & rhs have same time
getPeriodicSeriesTimeFromLogicalPlan(lp.lhs)
case lp: ScalarVectorBinaryOperation => getPeriodicSeriesTimeFromLogicalPlan(lp.vector)
case lp: ApplyMiscellaneousFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: ApplySortFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: ScalarVaryingDoublePlan => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: ScalarTimeBasedPlan => TimeRange(lp.rangeParams.startSecs, lp.rangeParams.endSecs)
case lp: VectorPlan => getPeriodicSeriesTimeFromLogicalPlan(lp.scalars)
case lp: ApplyAbsentFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case _ => throw new BadQueryException(s"Invalid logical plan")
}
}
/**
* Used to change start and end time(TimeRange) of LogicalPlan
* NOTE: Plan should be PeriodicSeriesPlan
*/
def copyWithUpdatedTimeRange(logicalPlan: LogicalPlan,
timeRange: TimeRange,
lookBackTime: Long): PeriodicSeriesPlan = {
logicalPlan match {
case lp: PeriodicSeries => lp.copy(startMs = timeRange.startMs,
endMs = timeRange.endMs,
rawSeries = copyNonPeriodicWithUpdatedTimeRange(lp.rawSeries, timeRange,
lookBackTime).asInstanceOf[RawSeries])
case lp: PeriodicSeriesWithWindowing => lp.copy(startMs = timeRange.startMs,
endMs = timeRange.endMs,
series = copyNonPeriodicWithUpdatedTimeRange(lp.series, timeRange,
lookBackTime))
case lp: ApplyInstantFunction => lp.copy(vectors = copyWithUpdatedTimeRange(lp.vectors, timeRange, lookBackTime))
case lp: Aggregate => lp.copy(vectors = copyWithUpdatedTimeRange(lp.vectors, timeRange, lookBackTime))
case lp: BinaryJoin => lp.copy(lhs = copyWithUpdatedTimeRange(lp.lhs, timeRange, lookBackTime),
rhs = copyWithUpdatedTimeRange(lp.rhs, timeRange, lookBackTime))
case lp: ScalarVectorBinaryOperation =>
lp.copy(vector = copyWithUpdatedTimeRange(lp.vector, timeRange, lookBackTime))
case lp: ApplyMiscellaneousFunction =>
lp.copy(vectors = copyWithUpdatedTimeRange(lp.vectors, timeRange, lookBackTime))
case lp: ApplySortFunction => lp.copy(vectors = copyWithUpdatedTimeRange(lp.vectors, timeRange, lookBackTime))
case _ => throw new UnsupportedOperationException("Logical plan not supported for copy")
}
}
/**
* Used to change rangeSelector of RawSeriesLikePlan
*/
private def copyNonPeriodicWithUpdatedTimeRange(plan: RawSeriesLikePlan,
timeRange: TimeRange,
lookBackTime: Long): RawSeriesLikePlan = {
plan match {
case rs: RawSeries => rs.rangeSelector match {
case is: IntervalSelector => rs.copy(rangeSelector = is.copy(timeRange.startMs, timeRange.endMs))
case _ => throw new UnsupportedOperationException("Copy supported only for IntervalSelector")
}
case p: ApplyInstantFunctionRaw =>
p.copy(vectors = copyNonPeriodicWithUpdatedTimeRange(p.vectors, timeRange, lookBackTime)
.asInstanceOf[RawSeries])
case _ => throw new UnsupportedOperationException("Copy supported only for RawSeries")
}
}
/**
* Retrieve start time of Raw Series
* NOTE: Plan should be PeriodicSeriesPlan
*/
def getRawSeriesStartTime(logicalPlan: LogicalPlan): Option[Long] = {
LogicalPlan.findLeafLogicalPlans(logicalPlan).head match {
case lp: RawSeries => lp.rangeSelector match {
case rs: IntervalSelector => Some(rs.from)
case _ => None
}
case _ => throw new BadQueryException(s"Invalid logical plan $logicalPlan")
}
}
def getOffsetMillis(logicalPlan: LogicalPlan): Long = {
LogicalPlan.findLeafLogicalPlans(logicalPlan).head match {
case lp: RawSeries => lp.offsetMs.getOrElse(0)
case _ => 0
}
}
def getLookBackMillis(logicalPlan: LogicalPlan): Long = {
val staleDataLookbackMillis = WindowConstants.staleDataLookbackMillis
LogicalPlan.findLeafLogicalPlans(logicalPlan).head match {
case lp: RawSeries => lp.lookbackMs.getOrElse(staleDataLookbackMillis)
case _ => 0
}
}
def getMetricName(logicalPlan: LogicalPlan, datasetMetricColumn: String): Option[Seq[String]] = {
val metricName = LogicalPlan.getLabelValueFromLogicalPlan(logicalPlan, PromMetricLabel)
if (metricName.isEmpty) LogicalPlan.getLabelValueFromLogicalPlan(logicalPlan, datasetMetricColumn)
else metricName
}
/**
* Renames Prom AST __name__ label to one based on the actual metric column of the dataset,
* if it is not the prometheus standard
*/
def renameLabels(labels: Seq[String], datasetMetricColumn: String): Seq[String] =
if (datasetMetricColumn != PromMetricLabel) {
labels map {
case PromMetricLabel => datasetMetricColumn
case other: String => other
}
} else {
labels
}
}