Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): add TenantIngestionMetering query throttle #1310

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 74 additions & 0 deletions coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package filodb.coordinator

import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.duration.FiniteDuration

/**
* Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to query attempts.
*
* @param queryInterval the initial delay between each query.
* @param intervalDiff diff added to queryInterval if the ratio of timeouts:queries in the lookback
* window exceeds timeoutThreshold
* @param timeoutThreshold ratio of timeouts:queries at which intervalDiff is added to queryInterval
* @param lookback number of queries used to check timeoutThreshold
*/
class QueryThrottle(queryInterval: FiniteDuration,
intervalDiff: FiniteDuration,
timeoutThreshold: Double,
lookback: Int) extends StrictLogging {

private var interval: FiniteDuration = queryInterval.copy()

// these track timeouts for the past LOOKBACK queries
private var bits = 0 // "1" indicates a timeout
private var ibit = 0

/**
* Sets the next lookback bit and increments ibit.
*/
private def setNextBit(bit: Boolean): Unit = {
val bitVal = if (bit) 1 else 0
bits = bits & ~(1 << ibit) // zero the bit
bits = bits | (bitVal << ibit) // 'or' in the new bit
ibit = ibit + 1
if (ibit == lookback) {
ibit = 0
}
}

/**
* Updates the interval according to the timeout:non-timeout ratio.
*/
private def updateInterval(): Unit = {
val failureRate = Integer.bitCount(bits).toDouble / lookback
if (failureRate > timeoutThreshold) {
interval = interval + intervalDiff
logger.info("too many timeouts; query interval extended to " + interval.toString())
// reset the bits
bits = 0
}
}

/**
* Record a query timeout.
*/
def recordTimeout(): Unit = {
setNextBit(true)
updateInterval()
}

/**
* Record a query non-timeout.
*/
def recordOnTime(): Unit = {
setNextBit(false)
updateInterval()
}

/**
* Returns the current query interval.
*/
def getInterval(): FiniteDuration = {
interval.copy()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,

private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
Some(new TenantIngestionMetering(
settings,
() => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator },
() => { _coordinators.head._2 })
inst.schedulePeriodicPublishJob()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real reason to require this method to be called after initialization.

Some(inst)
() => { _coordinators.head._2 }))
} else None

val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
package filodb.coordinator

import java.util.concurrent.TimeUnit
import java.util.concurrent.{TimeoutException, TimeUnit}

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

import akka.actor.ActorRef
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.tag.TagSet
import monix.eval.Task
import monix.execution.Scheduler.Implicits.{global => scheduler}
import monix.reactive.Observable

import filodb.coordinator.client.Client
import filodb.coordinator.client.QueryCommands.LogicalPlan2Query
import filodb.core.DatasetRef
import filodb.query.{QueryError, QueryResult, TsCardinalities}

object TenantIngestionMetering {
protected val METRIC_ACTIVE = "active_timeseries_by_tenant"
protected val METRIC_TOTAL = "total_timeseries_by_tenant"
protected val PARALLELISM = 8 // number of datasets queried in parallel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure what this should be-- should I make it configurable? Or get the number of cores from the JRE?


// QueryThrottle args
protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES)
protected val LOOKBACK = 10
protected val TIMEOUT_THRESHOLD = 0.15
}

/**
* Periodically queries a node for all namespace cardinalities.
* Kamon gauges are updated with the response data.
Expand All @@ -27,63 +40,91 @@ import filodb.query.{QueryError, QueryResult, TsCardinalities}
* @param coordActorProducer produces a single actor to ask a query. Actors are
* queried in the order they're returned from this function.
*/
case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{

private val ASK_TIMEOUT = FiniteDuration(
settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS)
private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled
private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first

private val CLUSTER_TYPE = settings.config.getString("cluster-type")
class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{
import TenantIngestionMetering._

private val METRIC_ACTIVE = "active_timeseries_by_tenant"
private val METRIC_TOTAL = "total_timeseries_by_tenant"
private val clusterType = settings.config.getString("cluster-type")
private var queryAskTimeSec = -1L // unix time of the most recent query ask
private val queryThrottle = new QueryThrottle(
FiniteDuration(settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS),
INTERVAL_DIFF,
TIMEOUT_THRESHOLD,
LOOKBACK)

def schedulePeriodicPublishJob() : Unit = {
// NOTE: the FiniteDuration overload of scheduleWithFixedDelay
// does not work. Unsure why, but that's why these FiniteDurations are
// awkwardly parsed into seconds.
scheduler.scheduleWithFixedDelay(
SCHED_INIT_DELAY.toSeconds,
SCHED_DELAY.toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedulePublish())
}
// immediately begin periodically querying for / publishing cardinality data
queryAndSchedule()

// scalastyle:off method.length
/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules a job to publish the Coordinator's response.
* For each dataset, asks a Coordinator with a TsCardinalities LogicalPlan.
* A publish job is sob is scheduled for each response, and the next batch of
* queries is scheduled after all responses are processed/published.
*/
private def queryAndSchedulePublish() : Unit = {
private def queryAndSchedule() : Unit = {
import filodb.query.exec.TsCardExec._
val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _)) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
case Failure(t) => logger.warn("Failure: " + t.getMessage)
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)

// Nil prefix in order to query all client-owned workspaces;
// numGroupByFields = 2 to group by (ws, ns)
val tsCardQuery = TsCardinalities(Nil, 2)

// use this later to find total elapsed time
queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond

Observable.fromIterator(dsIterProducer()).mapAsync(PARALLELISM){ dsRef =>
// Asynchronously query each dataset; store (dsRef, queryResult) pairs
Task{
val qres = Client.actorAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, tsCardQuery),
queryThrottle.getInterval()){
case t: Try[Any] => t
}
(dsRef, qres)
}
}.foreach { case (dsRef, qres) => qres match {
// process the query results one-at-a-time (prevents the need for locks in QueryThrottle)
case Success(qresp) =>
queryThrottle.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach { rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map(
"metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> clusterType)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryThrottle.recordTimeout()
} else {
queryThrottle.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + qres)
}
}.onComplete { _ =>
// Schedule the next query batch at the beginning of the next interval.
// Note: this "batch delay" setup is intended to keep the TIM config simple; only metering-query-interval
// needs to be configured. But it assumes the time required to sequentially process each
// response is negligible with respect to metering-query-interval.
val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec
scheduler.scheduleOnce(
math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec),
TimeUnit.SECONDS,
() => queryAndSchedule())
}
}
// scalastyle:on method.length
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package filodb.coordinator

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

class QueryThrottleSpec extends AnyFunSpec with Matchers {
it("should correctly increment query delay") {

val INIT_INTERVAL = FiniteDuration(10, TimeUnit.SECONDS)
val INTERVAL_DIFF = FiniteDuration(1, TimeUnit.SECONDS)

// two timeouts are allowed before the interval is incremented
val throttle = new QueryThrottle(
INIT_INTERVAL,
INTERVAL_DIFF,
timeoutThreshold=0.67,
lookback=3)

// on-times should not change the interval
throttle.getInterval() shouldEqual INIT_INTERVAL
throttle.recordOnTime()
throttle.getInterval() shouldEqual INIT_INTERVAL
throttle.recordOnTime()
throttle.getInterval() shouldEqual INIT_INTERVAL
throttle.recordOnTime()
throttle.getInterval() shouldEqual INIT_INTERVAL

// use this to trach expected interval
var interval = INIT_INTERVAL

// fail twice
throttle.recordTimeout()
throttle.getInterval() shouldEqual interval
throttle.recordTimeout()
throttle.getInterval() shouldEqual interval

// next failure should increment interval
interval = interval + INTERVAL_DIFF
throttle.recordTimeout()
throttle.getInterval() shouldEqual interval

// failure counter should reset-- fail twice again
throttle.recordTimeout()
throttle.getInterval() shouldEqual interval
throttle.recordTimeout()
throttle.getInterval() shouldEqual interval

// next failure should increment the interval
interval = interval + INTERVAL_DIFF
throttle.recordTimeout()
throttle.getInterval() shouldEqual interval

// success-failure-success-etc should not change the interval
throttle.recordTimeout()
throttle.recordOnTime()
throttle.recordTimeout()
throttle.recordOnTime()
throttle.recordTimeout()
throttle.recordOnTime()
throttle.getInterval() shouldEqual interval

// successes should not reset the counter to its initialized value
throttle.recordOnTime()
throttle.recordOnTime()
throttle.recordOnTime()
throttle.recordOnTime()
throttle.recordOnTime()
throttle.getInterval() shouldEqual interval
}
}