Skip to content

Commit

Permalink
remove throttle locks
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Jan 4, 2022
1 parent 6ad22b4 commit 6cb0c28
Showing 1 changed file with 61 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package filodb.coordinator

import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

import akka.actor.ActorRef
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.tag.TagSet
import monix.eval.Task
import monix.execution.Scheduler.Implicits.{global => scheduler}
import monix.reactive.Observable

import filodb.coordinator.client.Client
import filodb.coordinator.client.QueryCommands.LogicalPlan2Query
Expand All @@ -29,6 +30,7 @@ object QueryThrottle {
object TenantIngestionMetering {
protected val METRIC_ACTIVE = "active_timeseries_by_tenant"
protected val METRIC_TOTAL = "total_timeseries_by_tenant"
protected val PARALLELISM = 8 // number of datasets queried in parallel
}

/**
Expand All @@ -40,7 +42,6 @@ class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging {
import QueryThrottle._

private var interval: FiniteDuration = queryInterval.copy()
private val lock = new ReentrantReadWriteLock()

// these track timeouts for the past LOOKBACK queries
private var bits = (1 << LOOKBACK) - 1
Expand Down Expand Up @@ -76,30 +77,23 @@ class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging {
* Record a query timeout.
*/
def recordTimeout(): Unit = {
lock.writeLock().lock()
setNextBit(false)
updateInterval()
lock.writeLock().unlock()
}

/**
* Record a query non-timeout.
*/
def recordOnTime(): Unit = {
lock.writeLock().lock()
setNextBit(true)
updateInterval()
lock.writeLock().unlock()
}

/**
* Returns the current query interval.
*/
def getInterval(): FiniteDuration = {
lock.readLock().lock()
val currInterval = interval.copy()
lock.readLock().unlock()
currInterval
interval.copy()
}
}

Expand Down Expand Up @@ -130,63 +124,71 @@ class TenantIngestionMetering(settings: FilodbSettings,

// scalastyle:off method.length
/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules:
* (1) a job to publish the Coordinator's response, and
* (2) a job to execute the next query
* For each dataset, asks a Coordinator with a TsCardinalities LogicalPlan.
* A publish job is sob is scheduled for each response, and the next batch of
* queries is scheduled after all responses are processed/published.
*/
private def queryAndSchedule() : Unit = {
import filodb.query.exec.TsCardExec._
val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)

// Nil prefix in order to query all client-owned workspaces;
// numGroupByFields = 2 to group by (ws, ns)
val tsCardQuery = TsCardinalities(Nil, 2)

// use this later to find total elapsed time
queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond

dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)),
queryThrottle.getInterval())

fut.onComplete { tryRes =>
tryRes match {
case Success(qresp) =>
queryThrottle.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map(
"metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> clusterType)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryThrottle.recordTimeout()
} else {
queryThrottle.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)
Observable.fromIterator(dsIterProducer()).mapAsync(PARALLELISM){ dsRef =>
// Asynchronously query each dataset; store (dsRef, queryResult) pairs
Task{
val qres = Client.actorAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, tsCardQuery),
queryThrottle.getInterval()){
case t: Try[Any] => t
}
(dsRef, qres)
}
}.foreach { case (dsRef, qres) => qres match {
// process the query results one-at-a-time (prevents the need for locks in QueryThrottle)
case Success(qresp) =>
queryThrottle.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach { rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map(
"metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> clusterType)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}

// Delay the next query until the beginning of the next interval.
val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec
scheduler.scheduleOnce(
math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec),
TimeUnit.SECONDS,
() => queryAndSchedule())
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryThrottle.recordTimeout()
} else {
queryThrottle.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + qres)
}
}.onComplete { _ =>
// Schedule the next query batch at the beginning of the next interval.
// Note: this "batch delay" setup is intended to keep the TIM config simple; only metering-query-interval
// needs to be configured. But it assumes the time required to sequentially process each
// response is negligible with respect to metering-query-interval.
val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec
scheduler.scheduleOnce(
math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec),
TimeUnit.SECONDS,
() => queryAndSchedule())
}
}
// scalastyle:on method.length
Expand Down

0 comments on commit 6cb0c28

Please sign in to comment.