Skip to content

Commit

Permalink
Merge pull request #790 from broneill/master
Browse files Browse the repository at this point in the history
Merge integration to master version 0.9.7
  • Loading branch information
broneill committed Jun 15, 2020
2 parents 1765ed9 + 15d3767 commit 7388950
Show file tree
Hide file tree
Showing 105 changed files with 3,141 additions and 931 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -340,7 +340,7 @@ If you had run the unit test `DownsamplerMainSpec` which populates data into the
dataset, you can query downsample results by visiting the following URL:

```
curl "http://localhost:8080/promql/prometheus/api/v1/query_range?query=my_counter\{_ws_='my_ws',_ns_='my_ns'\}&start=1574272801&end=1574273042&step=10&verbose=true&spread=2"
curl "http://localhost:9080/promql/prometheus/api/v1/query_range?query=my_counter\{_ws_='my_ws',_ns_='my_ns'\}&start=74372801&end=74373042&step=10&verbose=true&spread=2"
```

#### Local Scale Testing
Expand Down
5 changes: 2 additions & 3 deletions conf/timeseries-dev-source.conf
Expand Up @@ -95,9 +95,8 @@
# metric = "bad-metric-to-log"
# }

# Maximum number of partitions per shard scanned per query. This is necessary
# to ensure no run-away query hogs memory and destabilizes the server.
# max-query-matches = 250000
# Limits maximum amount of data a single leaf query can scan
max-data-per-shard-query = 50 MB
}
downsample {
# can be disabled by setting this flag to false
Expand Down
19 changes: 15 additions & 4 deletions coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala
Expand Up @@ -109,6 +109,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
* reconciles any differences. It does so by stopping ingestion for shards that aren't mapped
* to this node, and it starts ingestion for those that are.
*/
// scalastyle:off method.length
private def resync(state: ShardIngestionState, origin: ActorRef): Unit = {
if (invalid(state.ref)) {
logger.error(s"$state is invalid for this ingester '$ref'.")
Expand Down Expand Up @@ -144,7 +145,13 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
}
} else {
val status = state.map.statuses(shard)
logger.info(s"Will stop ingestion of for dataset=$ref shard=$shard due to status ${status}")
if (shardsToStop.contains(shard)) {
logger.info(s"Will stop ingestion for dataset=$ref shard=$shard due to status ${status}")
} else {
// Already stopped. Send the message again in case it got dropped.
logger.info(s"Stopping ingestion again for dataset=$ref shard=$shard due to status ${status}")
sendStopMessage(shard)
}
}
}
}
Expand Down Expand Up @@ -237,9 +244,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
// Define a cancel task to run when ingestion is stopped.
val onCancel = Task {
logger.info(s"Ingestion cancel task invoked for dataset=$ref shard=$shard")
val stopped = IngestionStopped(ref, shard)
self ! stopped
statusActor ! stopped
sendStopMessage(shard)
}

val shardIngestionEnd = memStore.ingestStream(ref,
Expand Down Expand Up @@ -268,6 +273,12 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
}
}

private def sendStopMessage(shard: Int): Unit = {
val stopped = IngestionStopped(ref, shard)
self ! stopped
statusActor ! stopped
}

import Iterators._

/**
Expand Down
26 changes: 26 additions & 0 deletions coordinator/src/main/scala/filodb.coordinator/KamonLogger.scala
@@ -1,7 +1,10 @@
package filodb.coordinator

import scala.concurrent.Await

import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.metric.{MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.metric.MeasurementUnit.{information, time}
import kamon.metric.MeasurementUnit.Dimension.{Information, Time}
Expand Down Expand Up @@ -119,3 +122,26 @@ object KamonLogger {
new KamonSpanLogReporter
}
}

object KamonShutdownHook extends StrictLogging {

import scala.concurrent.duration._

private val shutdownHookAdded = new java.util.concurrent.atomic.AtomicBoolean(false)
def registerShutdownHook(): Unit = {
if (shutdownHookAdded.compareAndSet(false, true)) {
logger.info(s"Registering Kamon Shutdown Hook...")
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
logger.info(s"Stopping Kamon modules - this will ensure that last few metrics are drained")
try {
Await.result(Kamon.stopModules(), 5.minutes)
logger.info(s"Finished stopping Kamon modules")
} catch { case e: Exception =>
logger.error(s"Exception when stopping Kamon Modules", e)
}
}
})
}
}
}
144 changes: 101 additions & 43 deletions coordinator/src/main/scala/filodb.coordinator/QueryActor.scala
@@ -1,5 +1,7 @@
package filodb.coordinator

import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.{ScheduledThreadPoolExecutor, ThreadFactory}
import java.util.concurrent.atomic.AtomicLong

import scala.util.control.NonFatal
Expand All @@ -8,16 +10,18 @@ import akka.actor.{ActorRef, ActorSystem, Props}
import akka.dispatch.{Envelope, UnboundedStablePriorityMailbox}
import com.typesafe.config.Config
import kamon.Kamon
import kamon.instrumentation.executor.ExecutorInstrumentation
import kamon.tag.TagSet
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader

import filodb.coordinator.queryplanner.SingleClusterPlanner
import filodb.core._
import filodb.core.memstore.{FiloSchedulers, MemStore, TermInfo}
import filodb.core.metadata.Schemas
import filodb.core.query.QueryContext
import filodb.core.query.{QueryConfig, QueryContext, QuerySession}
import filodb.core.store.CorruptVectorException
import filodb.query._
import filodb.query.exec.ExecPlan
Expand Down Expand Up @@ -88,63 +92,107 @@ final class QueryActor(memStore: MemStore,
val queryConfig = new QueryConfig(config.getConfig("filodb.query"))
val queryPlanner = new SingleClusterPlanner(dsRef, schemas, shardMapFunc,
earliestRawTimestampFn, queryConfig, functionalSpreadProvider)
val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor") * sys.runtime.availableProcessors)
val queryScheduler = Scheduler.fixedPool(s"$QuerySchedName-$dsRef", numSchedThreads.toInt)
val queryScheduler = createInstrumentedQueryScheduler()

private val tags = Map("dataset" -> dsRef.toString)
private val lpRequests = Kamon.counter("queryactor-logicalPlan-requests").withTags(TagSet.from(tags))
private val epRequests = Kamon.counter("queryactor-execplan-requests").withTags(TagSet.from(tags))
private val resultVectors = Kamon.histogram("queryactor-result-num-rvs").withTags(TagSet.from(tags))
private val queryErrors = Kamon.counter("queryactor-query-errors").withTags(TagSet.from(tags))

/**
* Instrumentation adds following metrics on the Query Scheduler
*
* # Counter
* executor_tasks_submitted_total{type="ThreadPoolExecutor",name="query-sched-prometheus"}
* # Counter
* executor_tasks_completed_total{type="ThreadPoolExecutor",name="query-sched-prometheus"}
* # Histogram
* executor_threads_active{type="ThreadPoolExecutor",name="query-sched-prometheus"}
* # Histogram
* executor_queue_size_count{type="ThreadPoolExecutor",name="query-sched-prometheus"}
*
*/
private def createInstrumentedQueryScheduler(): SchedulerService = {
val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor")
* sys.runtime.availableProcessors).toInt
val schedName = s"$QuerySchedName-$dsRef"

val thFactory = new ThreadFactory {
def newThread(r: Runnable) = {
val thread = new Thread(r)
thread.setName(s"$schedName-${thread.getId}")
thread.setDaemon(true)
thread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit =
logger.error("Uncaught Exception in Query Scheduler", e)
})
thread
}
}
// TODO retaining old fixed size pool for now - later change to fork join pool.
val executor = new ScheduledThreadPoolExecutor(numSchedThreads, thFactory)
Scheduler.apply(ExecutorInstrumentation.instrument(executor, schedName))
}

def execPhysicalPlan2(q: ExecPlan, replyTo: ActorRef): Unit = {
epRequests.increment()
Kamon.currentSpan().tag("query", q.getClass.getSimpleName)
Kamon.currentSpan().tag("query-id", q.queryContext.queryId)
q.execute(memStore, queryConfig)(queryScheduler)
.foreach { res =>
FiloSchedulers.assertThreadName(QuerySchedName)
replyTo ! res
res match {
case QueryResult(_, _, vectors) => resultVectors.record(vectors.length)
case e: QueryError =>
queryErrors.increment()
logger.debug(s"queryId ${q.queryContext.queryId} Normal QueryError returned from query execution: $e")
e.t match {
case cve: CorruptVectorException => memStore.analyzeAndLogCorruptPtr(dsRef, cve)
case t: Throwable =>
}
}
}(queryScheduler).recover { case ex =>
// Unhandled exception in query, should be rare
logger.error(s"queryId ${q.queryContext.queryId} Unhandled Query Error: ", ex)
replyTo ! QueryError(q.queryContext.queryId, ex)
}(queryScheduler)
if (checkTimeout(q.queryContext, replyTo)) {
epRequests.increment()
Kamon.currentSpan().tag("query", q.getClass.getSimpleName)
Kamon.currentSpan().tag("query-id", q.queryContext.queryId)
val querySession = QuerySession(q.queryContext, queryConfig)
q.execute(memStore, querySession)(queryScheduler)
.foreach { res =>
FiloSchedulers.assertThreadName(QuerySchedName)
querySession.close()
replyTo ! res
res match {
case QueryResult(_, _, vectors) => resultVectors.record(vectors.length)
case e: QueryError =>
queryErrors.increment()
logger.debug(s"queryId ${q.queryContext.queryId} Normal QueryError returned from query execution: $e")
e.t match {
case cve: CorruptVectorException => memStore.analyzeAndLogCorruptPtr(dsRef, cve)
case t: Throwable =>
}
}
}(queryScheduler).recover { case ex =>
querySession.close()
// Unhandled exception in query, should be rare
logger.error(s"queryId ${q.queryContext.queryId} Unhandled Query Error: ", ex)
replyTo ! QueryError(q.queryContext.queryId, ex)
}(queryScheduler)
}
}

private def processLogicalPlan2Query(q: LogicalPlan2Query, replyTo: ActorRef) = {
// This is for CLI use only. Always prefer clients to materialize logical plan
lpRequests.increment()
try {
val execPlan = queryPlanner.materialize(q.logicalPlan, q.qContext)
self forward execPlan
} catch {
case NonFatal(ex) =>
if (!ex.isInstanceOf[BadQueryException]) // dont log user errors
logger.error(s"Exception while materializing logical plan", ex)
replyTo ! QueryError("unknown", ex)
if (checkTimeout(q.qContext, replyTo)) {
// This is for CLI use only. Always prefer clients to materialize logical plan
lpRequests.increment()
try {
val execPlan = queryPlanner.materialize(q.logicalPlan, q.qContext)
self forward execPlan
} catch {
case NonFatal(ex) =>
if (!ex.isInstanceOf[BadQueryException]) // dont log user errors
logger.error(s"Exception while materializing logical plan", ex)
replyTo ! QueryError("unknown", ex)
}
}
}

private def processExplainPlanQuery(q: ExplainPlan2Query, replyTo: ActorRef): Unit = {
try {
val execPlan = queryPlanner.materialize(q.logicalPlan, q.qContext)
replyTo ! execPlan
} catch {
case NonFatal(ex) =>
if (!ex.isInstanceOf[BadQueryException]) // dont log user errors
logger.error(s"Exception while materializing logical plan", ex)
replyTo ! QueryError("unknown", ex)
if (checkTimeout(q.qContext, replyTo)) {
try {
val execPlan = queryPlanner.materialize(q.logicalPlan, q.qContext)
replyTo ! execPlan
} catch {
case NonFatal(ex) =>
if (!ex.isInstanceOf[BadQueryException]) // dont log user errors
logger.error(s"Exception while materializing logical plan", ex)
replyTo ! QueryError("unknown", ex)
}
}
}

Expand All @@ -160,6 +208,16 @@ final class QueryActor(memStore: MemStore,
}
}

def checkTimeout(queryContext: QueryContext, replyTo: ActorRef): Boolean = {
// timeout can occur here if there is a build up in actor mailbox queue and delayed delivery
val queryTimeElapsed = System.currentTimeMillis() - queryContext.submitTime
if (queryTimeElapsed >= queryContext.queryTimeoutMillis) {
replyTo ! QueryError(s"Query timeout, $queryTimeElapsed ms > ${queryContext.queryTimeoutMillis}",
QueryTimeoutException(queryTimeElapsed, this.getClass.getName))
false
} else true
}

def receive: Receive = {
case q: LogicalPlan2Query => val replyTo = sender()
processLogicalPlan2Query(q, replyTo)
Expand Down

This file was deleted.

Expand Up @@ -3,8 +3,8 @@ package filodb.coordinator.queryplanner
import com.typesafe.scalalogging.StrictLogging

import filodb.core.DatasetRef
import filodb.core.query.{PromQlQueryParams, QueryContext}
import filodb.query.{LogicalPlan, QueryConfig}
import filodb.core.query.{PromQlQueryParams, QueryConfig, QueryContext}
import filodb.query.LogicalPlan
import filodb.query.exec.{ExecPlan, InProcessPlanDispatcher, PromQlExec, StitchRvsExec}

/**
Expand Down
@@ -1,5 +1,6 @@
package filodb.coordinator.queryplanner

import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
import filodb.query._

Expand Down Expand Up @@ -29,7 +30,7 @@ object LogicalPlanUtils {
case lp: ApplyInstantFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: Aggregate => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: BinaryJoin => // can assume lhs & rhs have same time
getPeriodicSeriesTimeFromLogicalPlan(lp.lhs)
getPeriodicSeriesTimeFromLogicalPlan(lp.lhs)
case lp: ScalarVectorBinaryOperation => getPeriodicSeriesTimeFromLogicalPlan(lp.vector)
case lp: ApplyMiscellaneousFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
case lp: ApplySortFunction => getPeriodicSeriesTimeFromLogicalPlan(lp.vectors)
Expand Down Expand Up @@ -121,4 +122,24 @@ object LogicalPlanUtils {
case _ => 0
}
}

def getMetricName(logicalPlan: LogicalPlan, datasetMetricColumn: String): Option[Seq[String]] = {
val metricName = LogicalPlan.getLabelValueFromLogicalPlan(logicalPlan, PromMetricLabel)
if (metricName.isEmpty) LogicalPlan.getLabelValueFromLogicalPlan(logicalPlan, datasetMetricColumn)
else metricName
}

/**
* Renames Prom AST __name__ label to one based on the actual metric column of the dataset,
* if it is not the prometheus standard
*/
def renameLabels(labels: Seq[String], datasetMetricColumn: String): Seq[String] =
if (datasetMetricColumn != PromMetricLabel) {
labels map {
case PromMetricLabel => datasetMetricColumn
case other: String => other
}
} else {
labels
}
}

0 comments on commit 7388950

Please sign in to comment.