Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): override RemoteExec::doExecute instead of execute() #1490

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ filodb {
# Choices are "legacy", "antlr", and "shadow". Shadow mode uses legacy but also checks antlr for errors.
parser = "antlr"

# Executes RemoteExec plans without invoking super.execute().
enable-legacy-remote-execute = false

routing {
# not currently used
}
Expand Down
55 changes: 48 additions & 7 deletions query/src/main/scala/filodb/query/exec/RemoteExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package filodb.query.exec

import java.util.concurrent.TimeUnit

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
import scala.sys.ShutdownHookThread

import com.softwaremill.sttp.{DeserializationError, Response, SttpBackend, SttpBackendOptions}
Expand All @@ -15,9 +15,11 @@ import kamon.Kamon
import kamon.trace.Span
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import org.asynchttpclient.{AsyncHttpClientConfig, DefaultAsyncHttpClientConfig}
import org.asynchttpclient.proxy.ProxyServer

import filodb.core.{GlobalConfig, QueryTimeoutException}
import filodb.core.query.{PromQlQueryParams, QuerySession, QueryStats}
import filodb.core.store.ChunkSource
import filodb.query._
Expand All @@ -42,13 +44,42 @@ trait RemoteExec extends LeafExecPlan with StrictLogging {
/**
* Since execute is already overrided here, doExecute() can be empty.
*/
def doExecute(source: ChunkSource,
override def doExecute(source: ChunkSource,
querySession: QuerySession)
(implicit sched: Scheduler): ExecResult = ???
(implicit sched: Scheduler): ExecResult = {
if (queryEndpoint == null) {
throw new BadQueryException("Remote Query endpoint can not be null in RemoteExec.")
}

override def execute(source: ChunkSource,
querySession: QuerySession)
(implicit sched: Scheduler): Task[QueryResponse] = {
// Please note that the following needs to be wrapped inside `runWithSpan` so that the context will be propagated
// across threads. Note that task/observable will not run on the thread where span is present since
// kamon uses thread-locals.
val span = Kamon.currentSpan()
val elapsedMillis = System.currentTimeMillis() - queryContext.submitTime
val remainingMillis = queryContext.plannerParams.queryTimeoutMillis - elapsedMillis
if (remainingMillis <= 0) {
throw QueryTimeoutException(elapsedMillis, "RemoteExec::doExecute (before remote request)")
}
// Dont finish span since this code didnt create it
val fut = Kamon.runWithSpan(span, false) {
sendHttpRequest(span, requestTimeoutMs)
.map {
case QueryResult(_, resultSchema, result, _, _, _) =>
ExecResult(Observable.fromIterable(result), Task.now(resultSchema))
case QueryError(_, _, t) => throw t
}
}
// FIXME: remote plans probably need their own dispatcher so they've got access
// to ExecPlanWithClientParams (and can therefore return partial results).
Await.result(fut, FiniteDuration(remainingMillis, MILLISECONDS))
}

/**
* Legacy execute() logic; does not invoke super.execute().
* FIXME: this method should eventually be removed.
*/
def executeLegacy(source: ChunkSource,
querySession: QuerySession)(implicit sched: Scheduler): Task[QueryResponse] = {
if (queryEndpoint == null) {
throw new BadQueryException("Remote Query endpoint can not be null in RemoteExec.")
}
Expand All @@ -63,6 +94,16 @@ trait RemoteExec extends LeafExecPlan with StrictLogging {
}
}

override def execute(source: ChunkSource,
querySession: QuerySession)(implicit sched: Scheduler): Task[QueryResponse] = {
// NOTE: this should no longer be overridden once executeLegacy is removed.
if (GlobalConfig.systemConfig.getBoolean("filodb.query.enable-legacy-remote-execute")) {
executeLegacy(source, querySession)
} else {
super.execute(source, querySession)
}
}

def sendHttpRequest(execPlan2Span: Span, httpTimeoutMs: Long)
(implicit sched: Scheduler): Future[QueryResponse]

Expand Down
60 changes: 57 additions & 3 deletions query/src/test/scala/filodb/query/exec/PromQlRemoteExecSpec.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package filodb.query.exec

import com.typesafe.config.ConfigFactory
import filodb.core.MetricsTestData
import filodb.core.metadata.Column.ColumnType
import kamon.Kamon
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import filodb.core.metadata.{Dataset, DatasetOptions}
import filodb.core.query.{PromQlQueryParams, QueryContext}
import filodb.core.query.{ColumnInfo, CustomRangeVectorKey, PromQlQueryParams, QueryConfig, QueryContext, QuerySession, RangeParams, ResultSchema, RvRange}
import filodb.core.store.ChunkSource
import filodb.memory.format.ZeroCopyUTF8String.StringToUTF8
import filodb.memory.format.vectors.MutableHistogram
import filodb.query
import filodb.query.{Data, HistSampl, MetadataMapSampl, MetadataSuccessResponse, QueryResponse, QueryResult, Sampl, StreamQueryResponse, SuccessResponse}
import filodb.query.{BinaryOperator, Data, HistSampl, MetadataMapSampl, MetadataSuccessResponse, QueryResponse, QueryResult, Sampl, StreamQueryResponse, SuccessResponse}
import monix.execution.Scheduler.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration


class PromQlRemoteExecSpec extends AnyFunSpec with Matchers with ScalaFutures {
Expand All @@ -36,6 +43,9 @@ class PromQlRemoteExecSpec extends AnyFunSpec with Matchers with ScalaFutures {

val params = PromQlQueryParams("", 0, 0 , 0)
val queryContext = QueryContext(origQueryParams = params)
val config = ConfigFactory.load("application_test.conf")
val queryConfig = QueryConfig(config.getConfig("filodb.query"))

it ("should convert matrix Data to QueryResponse ") {
val expectedResult = List((1000000, 1.0), (2000000, 2.0), (3000000, 3.0))
val exec = PromQlRemoteExec("", 60000, queryContext, dummyDispatcher, timeseriesDataset.ref, RemoteHttpClient.defaultClient)
Expand Down Expand Up @@ -103,4 +113,48 @@ class PromQlRemoteExecSpec extends AnyFunSpec with Matchers with ScalaFutures {
hist.bucketValue(0) shouldEqual(2.0)
hist.bucketValue(1) shouldEqual(3)
}

it ("should correctly apply RVT") {
val data = Seq(
(CustomRangeVectorKey(Map("foo".utf8 -> "bar".utf8)),
Seq((1000L, 1.0), (2000L, 2.0))),
(CustomRangeVectorKey(Map("bat".utf8 -> "baz".utf8)),
Seq((1000L, 3.0), (2000L, 4.0)))
)
val range = RvRange(1000, 1000, 2000)
val resultSchema1 = ResultSchema(
Seq(ColumnInfo("timestamp", ColumnType.TimestampColumn),
ColumnInfo("value", ColumnType.DoubleColumn)),
numRowKeyColumns = 1,
fixedVectorLen = Some(123))
// Override doExecute to return the above data.
val exec = new PromQlRemoteExec("", 60000, queryContext, dummyDispatcher,
timeseriesDataset.ref, RemoteHttpClient.defaultClient) {
override def doExecute(source: ChunkSource, querySession: QuerySession)(implicit sched: Scheduler): ExecResult = {
val rvs = data.map { case (key, tsValPairs) =>
MetricsTestData.makeRv(key, tsValPairs, range)
}
ExecResult(Observable.fromIterable(rvs), Task.now(resultSchema1))
}
}
// Use an RVT to add `diff` to each value.
val diff = 10.0
exec.addRangeVectorTransformer(
ScalarOperationMapper(
BinaryOperator.ADD,
scalarOnLhs = false,
Seq(StaticFuncArgs(diff, RangeParams(1000, 1000, 1000)))))
val fut = exec.execute(UnsupportedChunkSource(), QuerySession(queryContext, queryConfig))(global)
.runToFuture(global)
val qres = Await.result(fut, Duration.Inf).asInstanceOf[QueryResult]
// Convert the result back to simple key / time-value tuples.
qres.result.map { rv =>
val key = rv.key.labelValues
val pairs = rv.rows().toSeq.map { r =>
// Subtract the diff from the result-- this should again equal the original data.
(r.getLong(0), r.getDouble(1) - diff)
}
(key, pairs)
} shouldEqual data.map{ rv => (rv._1.labelValues, rv._2)}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,10 @@ class RemoteMetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures

val resp = exec.execute(memStore, querySession).runToFuture.futureValue
val result = (resp: @unchecked) match {
case QueryResult(id, _, response, _, _, _) => {
val rv = response(0)
rv.rows.size shouldEqual 0
rv.rows.map { row =>
val record = row.asInstanceOf[BinaryRecordRowReader]
rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset)
}
Comment on lines -170 to -176
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sketched out by this-- @sherali42 , do we need the result to contain an empty RV?

case QueryResult(_, _, response, _, _, _) => {
response.size shouldEqual 0
}
}
result.toArray shouldEqual Array.empty
}

it ("label values remote metadata exec") {
Expand Down