Skip to content

Commit

Permalink
add TIM query throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Dec 15, 2021
1 parent b21c240 commit ca17e67
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,

private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
val inst = new TenantIngestionMetering(
settings,
() => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator },
() => { _coordinators.head._2 })
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filodb.coordinator

import java.util.concurrent.TimeUnit
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
Expand All @@ -16,6 +17,83 @@ import filodb.coordinator.client.QueryCommands.LogicalPlan2Query
import filodb.core.DatasetRef
import filodb.query.{QueryError, QueryResult, TsCardinalities}

/**
* Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts.
*
* @param queryDelay the initial delay between each query. This is the duration to be adjusted.
*/
private class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging {
// currently just add this diff to the delay if the timeout rate exceeds THRESHOLD
private val DELAY_DIFF = FiniteDuration(5L, TimeUnit.MINUTES)
// number of past query timeouts/non-timeouts to consider
private val LOOKBACK = 10
// {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the delay
private val THRESHOLD = 0.85

private var delay: FiniteDuration = queryDelay.copy()
private val lock = new ReentrantReadWriteLock()

// these track timeouts for the past LOOKBACK queries
private var bits = (1 << LOOKBACK) - 1
private var ibit = 0

/**
* Sets the next lookback bit and increments iBit.
*/
private def setNextBit(bit: Boolean): Unit = {
val bitVal = if (bit) 1 else 0
bits = bits & ~(1 << ibit) // zero the bit
bits = bits | (bitVal << ibit) // 'or' in the new bit
ibit = ibit + 1
if (ibit == LOOKBACK) {
ibit = 0
}
}

/**
* Updates the delay according to the timeout:non-timeoout ratio.
*/
private def updateDelay(): Unit = {
val successRate = Integer.bitCount(bits).toDouble / LOOKBACK
if (successRate < THRESHOLD) {
delay = delay + DELAY_DIFF
logger.info("too many timeouts; query delay extended to " + delay.toString())
// reset the bits
bits = (1 << LOOKBACK) - 1
}
}

/**
* Record a query timeout.
*/
def recordTimeout(): Unit = {
lock.writeLock().lock()
setNextBit(false)
updateDelay()
lock.writeLock().unlock()
}

/**
* Record a query non-timeout.
*/
def recordOnTime(): Unit = {
lock.writeLock().lock()
setNextBit(true)
updateDelay()
lock.writeLock().unlock()
}

/**
* Returns the current query delay.
*/
def getDelay(): FiniteDuration = {
lock.readLock().lock()
val currDelay = delay.copy()
lock.readLock().unlock()
return currDelay
}
}

/**
* Periodically queries a node for all namespace cardinalities.
* Kamon gauges are updated with the response data.
Expand All @@ -27,63 +105,79 @@ import filodb.query.{QueryError, QueryResult, TsCardinalities}
* @param coordActorProducer produces a single actor to ask a query. Actors are
* queried in the order they're returned from this function.
*/
case class TenantIngestionMetering(settings: FilodbSettings,
class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{

private val ASK_TIMEOUT = FiniteDuration(
// time until first query executes
private val SCHED_INIT_DELAY = FiniteDuration(
settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS)
private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled
private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first

private val CLUSTER_TYPE = settings.config.getString("cluster-type")

private val METRIC_ACTIVE = "active_timeseries_by_tenant"
private val METRIC_TOTAL = "total_timeseries_by_tenant"

private val queryLimiter = new QueryThrottle(SCHED_INIT_DELAY)

def schedulePeriodicPublishJob() : Unit = {
// NOTE: the FiniteDuration overload of scheduleWithFixedDelay
// does not work. Unsure why, but that's why these FiniteDurations are
// awkwardly parsed into seconds.
scheduler.scheduleWithFixedDelay(
scheduler.scheduleOnce(
SCHED_INIT_DELAY.toSeconds,
SCHED_DELAY.toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedulePublish())
() => queryAndSchedule())
}

/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules a job to publish the Coordinator's response.
*/
private def queryAndSchedulePublish() : Unit = {
private def queryAndSchedule() : Unit = {
import filodb.query.exec.TsCardExec._
val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)),
ASK_TIMEOUT)
queryLimiter.getDelay())
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _)) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
case Failure(t) => logger.warn("Failure: " + t.getMessage)
case Success(qresp) =>
queryLimiter.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryLimiter.recordTimeout()
} else {
queryLimiter.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)
}
}

// schedule the next query
scheduler.scheduleOnce(
queryLimiter.getDelay().toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedule())
}
}

0 comments on commit ca17e67

Please sign in to comment.