Skip to content

Commit

Permalink
Merge Integration to Main 0.9.11
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Dec 12, 2020
2 parents c478297 + 7f35cdc commit da1c628
Show file tree
Hide file tree
Showing 126 changed files with 3,205 additions and 1,247 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -2,7 +2,7 @@ language: scala
dist: trusty
env:
global:
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=256m"
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=512m"
scala:
- 2.11.12
jdk:
Expand Down
Expand Up @@ -13,6 +13,7 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.metric.MeasurementUnit
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
Expand Down Expand Up @@ -56,7 +57,6 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
import collection.JavaConverters._

import filodb.core.store._
import Perftools._

logger.info(s"Starting CassandraColumnStore with config ${cassandraConfig.withoutPath("password")}")

Expand All @@ -68,6 +68,13 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {

val sinkStats = new ChunkSinkStats

val writeChunksetLatency = Kamon.histogram("cass-write-chunkset-latency", MeasurementUnit.time.milliseconds)
.withoutTags()
val writePksLatency = Kamon.histogram("cass-write-part-keys-latency", MeasurementUnit.time.milliseconds)
.withoutTags()
val readChunksBatchLatency = Kamon.histogram("cassandra-per-batch-chunk-read-latency",
MeasurementUnit.time.milliseconds).withoutTags()

def initialize(dataset: DatasetRef, numShards: Int): Future[Response] = {
val chunkTable = getOrCreateChunkTable(dataset)
val partitionKeysByUpdateTimeTable = getOrCreatePartitionKeysByUpdateTimeTable(dataset)
Expand Down Expand Up @@ -133,49 +140,45 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
chunksets: Observable[ChunkSet],
diskTimeToLiveSeconds: Int = 259200): Future[Response] = {
chunksets.mapAsync(writeParallelism) { chunkset =>
val span = Kamon.spanBuilder("write-chunkset").asChildOf(Kamon.currentSpan()).start()
val partBytes = BinaryRegionLarge.asNewByteArray(chunkset.partition)
val future =
for { writeChunksResp <- writeChunks(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeChunksResp == Success
writeIndicesResp <- writeIndices(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeIndicesResp == Success
} yield {
span.finish()
sinkStats.chunksetWrite()
writeIndicesResp
}
Task.fromFuture(future)
}
.countL.runAsync
.map { chunksWritten =>
if (chunksWritten > 0) Success else NotApplied
val start = System.currentTimeMillis()
val partBytes = BinaryRegionLarge.asNewByteArray(chunkset.partition)
val future =
for { writeChunksResp <- writeChunks(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeChunksResp == Success
writeIndicesResp <- writeIndices(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeIndicesResp == Success
} yield {
writeChunksetLatency.record(System.currentTimeMillis() - start)
sinkStats.chunksetWrite()
writeIndicesResp
}
Task.fromFuture(future)
}
.countL.runAsync
.map { chunksWritten =>
if (chunksWritten > 0) Success else NotApplied
}
}

private def writeChunks(ref: DatasetRef,
partition: Array[Byte],
chunkset: ChunkSet,
diskTimeToLiveSeconds: Int): Future[Response] = {
asyncSubtrace("write-chunks", "ingestion") {
val chunkTable = getOrCreateChunkTable(ref)
chunkTable.writeChunks(partition, chunkset.info, chunkset.chunks, sinkStats, diskTimeToLiveSeconds)
.collect {
case Success => chunkset.invokeFlushListener(); Success
}
}
val chunkTable = getOrCreateChunkTable(ref)
chunkTable.writeChunks(partition, chunkset.info, chunkset.chunks, sinkStats, diskTimeToLiveSeconds)
.collect {
case Success => chunkset.invokeFlushListener(); Success
}
}

private def writeIndices(ref: DatasetRef,
partition: Array[Byte],
chunkset: ChunkSet,
diskTimeToLiveSeconds: Int): Future[Response] = {
asyncSubtrace("write-index", "ingestion") {
val indexTable = getOrCreateIngestionTimeIndexTable(ref)
val info = chunkset.info
val infos = Seq((info.ingestionTime, info.startTime, ChunkSetInfo.toBytes(info)))
indexTable.writeIndices(partition, infos, sinkStats, diskTimeToLiveSeconds)
}
val indexTable = getOrCreateIngestionTimeIndexTable(ref)
val info = chunkset.info
val infos = Seq((info.ingestionTime, info.startTime, ChunkSetInfo.toBytes(info)))
indexTable.writeIndices(partition, infos, sinkStats, diskTimeToLiveSeconds)
}

/**
Expand Down Expand Up @@ -212,11 +215,11 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
s"endTimeExclusive=$endTimeExclusive maxChunkTime=$maxChunkTime")
// This could be more parallel, but decision was made to control parallelism at one place: In spark (via its
// parallelism configuration. Revisit if needed later.
val batchReadSpan = Kamon.spanBuilder("cassandra-per-batch-data-read-latency").start()
val start = System.currentTimeMillis()
try {
chunksTable.readRawPartitionRangeBBNoAsync(parts, userTimeStart - maxChunkTime, endTimeExclusive)
} finally {
batchReadSpan.finish()
readChunksBatchLatency.record(System.currentTimeMillis() - start)
}
}
}
Expand Down Expand Up @@ -376,7 +379,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
writeToPkUTTable: Boolean = true): Future[Response] = {
val pkTable = getOrCreatePartitionKeysTable(ref, shard)
val pkByUTTable = getOrCreatePartitionKeysByUpdateTimeTable(ref)
val span = Kamon.spanBuilder("write-part-keys").asChildOf(Kamon.currentSpan()).start()
val start = System.currentTimeMillis()
val ret = partKeys.mapAsync(writeParallelism) { pk =>
val ttl = if (pk.endTime == Long.MaxValue) -1 else diskTTLSeconds
// caller needs to supply hash for partKey - cannot be None
Expand All @@ -393,7 +396,9 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
resp
}
}.findL(_.isInstanceOf[ErrorResponse]).map(_.getOrElse(Success)).runAsync
ret.onComplete(_ => span.finish())
ret.onComplete { _ =>
writePksLatency.record(System.currentTimeMillis() - start)
}
ret
}

Expand Down
@@ -1,22 +1,20 @@
package filodb.cassandra.columnstore

import scala.concurrent.Future

import com.typesafe.config.ConfigFactory
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}

import filodb.cassandra.DefaultFiloSessionProvider
import filodb.core.{MachineMetricsData, TestData}
import filodb.core.binaryrecord2.{BinaryRecordRowReader, RecordBuilder}
import filodb.core.downsample.OffHeapMemory
import filodb.core.memstore._
import filodb.core.memstore.FiloSchedulers.QuerySchedName
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.query.{ColumnFilter, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.{ColumnFilter, PlannerParams, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.Filter.Equals
import filodb.core.store.{InMemoryMetaStore, PartKeyRecord, StoreConfig, TimeRangeChunkScan}
import filodb.memory.format.ZeroCopyUTF8String._
Expand Down Expand Up @@ -84,7 +82,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala

MachineMetricsData.records(dataset, rawSamples).records.foreach { case (base, offset) =>
val rr = new BinaryRecordRowReader(Schemas.gauge.ingestionSchema, base, offset)
part.ingest( System.currentTimeMillis(), rr, offheapMem.blockMemFactory)
part.ingest( System.currentTimeMillis(), rr, offheapMem.blockMemFactory, false, Option.empty)
part.switchBuffers(offheapMem.blockMemFactory, true)
}
val chunks = part.makeFlushChunks(offheapMem.blockMemFactory)
Expand Down Expand Up @@ -187,8 +185,9 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
def query(memStore: TimeSeriesMemStore): Future[QueryResponse] = {
val colFilters = seriesTags.map { case (t, v) => ColumnFilter(t.toString, Equals(v.toString)) }.toSeq
val queryFilters = colFilters :+ ColumnFilter("_metric_", Equals(gaugeName))
val exec = MultiSchemaPartitionsExec(QueryContext(sampleLimit = numSamples * 2), InProcessPlanDispatcher,
dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 * numSamples))
val exec = MultiSchemaPartitionsExec(QueryContext(plannerParams = PlannerParams(sampleLimit = numSamples * 2)),
InProcessPlanDispatcher, dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 *
numSamples))
val queryConfig = new QueryConfig(config.getConfig("query"))
val querySession = QuerySession(QueryContext(), queryConfig)
exec.execute(memStore, querySession)(queryScheduler).runAsync(queryScheduler)
Expand Down
32 changes: 27 additions & 5 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Expand Up @@ -30,7 +30,6 @@ import filodb.query._
// scalastyle:off
class Arguments(args: Seq[String]) extends ScallopConf(args) {


val dataset = opt[String]()
val database = opt[String]()
val command = opt[String]()
Expand Down Expand Up @@ -64,6 +63,9 @@ class Arguments(args: Seq[String]) extends ScallopConf(args) {
val everynseconds = opt[String]()
val shards = opt[List[String]]()
val spread = opt[Int]()
val k = opt[Int]()
val shardkeyprefix = opt[List[String]](default = Some(List()))

verify()

override def onError(e: Throwable): Unit = e match {
Expand Down Expand Up @@ -101,6 +103,7 @@ object CliMain extends FilodbClusterNode {
println(" --host <hostname/IP> [--port ...] --command list")
println(" --host <hostname/IP> [--port ...] --command status --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command labelvalues --labelName <lable-names> --labelfilter <label-filter> --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command topkcard --dataset prometheus --k 2 --shardkeyprefix demo App-0")
println(""" --command promFilterToPartKeyBR --promql "myMetricName{_ws_='myWs',_ns_='myNs'}" --schema prom-counter""")
println(""" --command partKeyBrAsString --hexpk 0x2C0000000F1712000000200000004B8B36940C006D794D65747269634E616D650E00C104006D794E73C004006D795773""")
println(""" --command decodeChunkInfo --hexchunkinfo 0x12e8253a267ea2db060000005046fc896e0100005046fc896e010000""")
Expand Down Expand Up @@ -132,8 +135,8 @@ object CliMain extends FilodbClusterNode {
}

def main(rawArgs: Array[String]): Unit = {
val args = new Arguments(rawArgs)
try {
val args = new Arguments(rawArgs)
val timeout = args.timeoutseconds().seconds
args.command.toOption match {
case Some("init") =>
Expand Down Expand Up @@ -165,6 +168,23 @@ object CliMain extends FilodbClusterNode {
val values = remote.getIndexValues(ref, args.indexname(), args.shards().head.toInt, args.limit())
values.foreach { case (term, freq) => println(f"$term%40s\t$freq") }

case Some("topkcard") =>
require(args.host.isDefined && args.dataset.isDefined && args.k.isDefined,
"--host, --dataset, --k must be defined")
val (remote, ref) = getClientAndRef(args)
val res = remote.getTopkCardinality(ref, args.shards.getOrElse(Nil).map(_.toInt),
args.shardkeyprefix(), args.k())
println(s"ShardKeyPrefix: ${args.shardkeyprefix}")
res.groupBy(_.shard).foreach { crs =>
println(s"Shard: ${crs._1}")
printf("%40s %12s %10s %10s\n", "Child", "TimeSeries", "Children", "Children")
printf("%40s %12s %10s %10s\n", "Name", "Count", "Count", "Quota")
println("===================================================================================")
crs._2.foreach { cr =>
printf("%40s %12d %10d %10d\n", cr.childName, cr.timeSeriesCount, cr.childrenCount, cr.childrenQuota)
}
}

case Some("status") =>
val (remote, ref) = getClientAndRef(args)
dumpShardStatus(remote, ref)
Expand Down Expand Up @@ -349,9 +369,11 @@ object CliMain extends FilodbClusterNode {
options: QOptions, tsdbQueryParams: TsdbQueryParams): Unit = {
val ref = DatasetRef(dataset)
val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s)))
val qOpts = QueryContext(tsdbQueryParams, spreadProvider, options.sampleLimit)
.copy(queryTimeoutMillis = options.timeout.toMillis.toInt,
shardOverrides = options.shardOverrides)

val qOpts = QueryContext(origQueryParams = tsdbQueryParams,
plannerParams = PlannerParams(applicationId = "filodb-cli", spreadOverride = spreadProvider,
sampleLimit = options.sampleLimit, queryTimeoutMillis = options.timeout.toMillis.toInt,
shardOverrides = options.shardOverrides))
println(s"Sending query command to server for $ref with options $qOpts...")
println(s"Query Plan:\n$plan")
options.everyN match {
Expand Down
3 changes: 3 additions & 0 deletions conf/timeseries-dev-source.conf
Expand Up @@ -97,6 +97,9 @@

# Limits maximum amount of data a single leaf query can scan
max-data-per-shard-query = 50 MB

# Set to true to enable metering of time series. Used for rate-limiting
metering-enabled = true
}
downsample {
# Resolutions for downsampled data ** in ascending order **
Expand Down
16 changes: 16 additions & 0 deletions conf/timeseries-filodb-server.conf
Expand Up @@ -11,6 +11,22 @@ filodb {
"conf/timeseries-dev-source.conf"
]

quotas {
prometheus {
defaults = [100, 500, 10000, 100000]
custom = [
{
shardKeyPrefix = ["demo", "App-0", "heap_usage"]
quota = 100
},
{
shardKeyPrefix = ["demo"]
quota = 10
}
]
}
}

spread-default = 1

# Override default spread for application using override block which will have non metric shard keys and spread.
Expand Down
Expand Up @@ -11,6 +11,7 @@ import scala.util.control.NonFatal
import akka.actor.{ActorRef, Props}
import akka.event.LoggingReceive
import kamon.Kamon
import kamon.metric.MeasurementUnit
import monix.eval.Task
import monix.execution.{CancelableFuture, Scheduler, UncaughtExceptionReporter}
import monix.reactive.Observable
Expand Down Expand Up @@ -293,10 +294,12 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
private def doRecovery(shard: Int, startOffset: Long, endOffset: Long, interval: Long,
checkpoints: Map[Int, Long]): Future[Option[Long]] = {
val futTry = create(shard, Some(startOffset)) map { ingestionStream =>
val recoveryTrace = Kamon.spanBuilder("ingestion-recovery-trace")
.asChildOf(Kamon.currentSpan())
.tag("shard", shard.toString)
.tag("dataset", ref.toString).start()

val ingestionRecoveryLatency = Kamon.histogram("ingestion-recovery-latency", MeasurementUnit.time.milliseconds)
.withTag("dataset", ref.dataset)
.withTag("shard", shard)

val recoveryStart = System.currentTimeMillis()
val stream = ingestionStream.get
statusActor ! RecoveryInProgress(ref, shard, nodeCoord, 0)

Expand All @@ -317,12 +320,11 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
logger.info(s"Finished recovery for dataset=$ref shard=$shard")
ingestionStream.teardown()
streams.remove(shard)
recoveryTrace.finish()
ingestionRecoveryLatency.record(System.currentTimeMillis() - recoveryStart)
case Failure(ex) =>
recoveryTrace.fail(s"Recovery failed for dataset=$ref shard=$shard", ex)
logger.error(s"Recovery failed for dataset=$ref shard=$shard", ex)
handleError(ref, shard, ex)
recoveryTrace.finish()
ingestionRecoveryLatency.record(System.currentTimeMillis() - recoveryStart)
}(actorDispatcher)
fut
}
Expand Down
Expand Up @@ -8,6 +8,7 @@ import scala.concurrent.duration._
import akka.actor.{ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.event.LoggingReceive
import kamon.Kamon
import net.ceedubs.ficus.Ficus._

import filodb.coordinator.client.MiscCommands
Expand Down Expand Up @@ -195,6 +196,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
def queryHandlers: Receive = LoggingReceive {
case q: QueryCommand =>
val originator = sender()
Kamon.currentSpan().mark("NodeCoordinatorActor received query")
withQueryActor(originator, q.dataset) { _.tell(q, originator) }
case QueryActor.ThrowException(dataset) =>
val originator = sender()
Expand Down

0 comments on commit da1c628

Please sign in to comment.