Skip to content

Commit

Permalink
Merge branch 'integration' into main-0-9-11
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Dec 11, 2020
2 parents c478297 + d7a39b5 commit 7f35cdc
Show file tree
Hide file tree
Showing 126 changed files with 3,205 additions and 1,247 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: scala
dist: trusty
env:
global:
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=256m"
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=512m"
scala:
- 2.11.12
jdk:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.metric.MeasurementUnit
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
Expand Down Expand Up @@ -56,7 +57,6 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
import collection.JavaConverters._

import filodb.core.store._
import Perftools._

logger.info(s"Starting CassandraColumnStore with config ${cassandraConfig.withoutPath("password")}")

Expand All @@ -68,6 +68,13 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {

val sinkStats = new ChunkSinkStats

val writeChunksetLatency = Kamon.histogram("cass-write-chunkset-latency", MeasurementUnit.time.milliseconds)
.withoutTags()
val writePksLatency = Kamon.histogram("cass-write-part-keys-latency", MeasurementUnit.time.milliseconds)
.withoutTags()
val readChunksBatchLatency = Kamon.histogram("cassandra-per-batch-chunk-read-latency",
MeasurementUnit.time.milliseconds).withoutTags()

def initialize(dataset: DatasetRef, numShards: Int): Future[Response] = {
val chunkTable = getOrCreateChunkTable(dataset)
val partitionKeysByUpdateTimeTable = getOrCreatePartitionKeysByUpdateTimeTable(dataset)
Expand Down Expand Up @@ -133,49 +140,45 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
chunksets: Observable[ChunkSet],
diskTimeToLiveSeconds: Int = 259200): Future[Response] = {
chunksets.mapAsync(writeParallelism) { chunkset =>
val span = Kamon.spanBuilder("write-chunkset").asChildOf(Kamon.currentSpan()).start()
val partBytes = BinaryRegionLarge.asNewByteArray(chunkset.partition)
val future =
for { writeChunksResp <- writeChunks(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeChunksResp == Success
writeIndicesResp <- writeIndices(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeIndicesResp == Success
} yield {
span.finish()
sinkStats.chunksetWrite()
writeIndicesResp
}
Task.fromFuture(future)
}
.countL.runAsync
.map { chunksWritten =>
if (chunksWritten > 0) Success else NotApplied
val start = System.currentTimeMillis()
val partBytes = BinaryRegionLarge.asNewByteArray(chunkset.partition)
val future =
for { writeChunksResp <- writeChunks(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeChunksResp == Success
writeIndicesResp <- writeIndices(ref, partBytes, chunkset, diskTimeToLiveSeconds)
if writeIndicesResp == Success
} yield {
writeChunksetLatency.record(System.currentTimeMillis() - start)
sinkStats.chunksetWrite()
writeIndicesResp
}
Task.fromFuture(future)
}
.countL.runAsync
.map { chunksWritten =>
if (chunksWritten > 0) Success else NotApplied
}
}

private def writeChunks(ref: DatasetRef,
partition: Array[Byte],
chunkset: ChunkSet,
diskTimeToLiveSeconds: Int): Future[Response] = {
asyncSubtrace("write-chunks", "ingestion") {
val chunkTable = getOrCreateChunkTable(ref)
chunkTable.writeChunks(partition, chunkset.info, chunkset.chunks, sinkStats, diskTimeToLiveSeconds)
.collect {
case Success => chunkset.invokeFlushListener(); Success
}
}
val chunkTable = getOrCreateChunkTable(ref)
chunkTable.writeChunks(partition, chunkset.info, chunkset.chunks, sinkStats, diskTimeToLiveSeconds)
.collect {
case Success => chunkset.invokeFlushListener(); Success
}
}

private def writeIndices(ref: DatasetRef,
partition: Array[Byte],
chunkset: ChunkSet,
diskTimeToLiveSeconds: Int): Future[Response] = {
asyncSubtrace("write-index", "ingestion") {
val indexTable = getOrCreateIngestionTimeIndexTable(ref)
val info = chunkset.info
val infos = Seq((info.ingestionTime, info.startTime, ChunkSetInfo.toBytes(info)))
indexTable.writeIndices(partition, infos, sinkStats, diskTimeToLiveSeconds)
}
val indexTable = getOrCreateIngestionTimeIndexTable(ref)
val info = chunkset.info
val infos = Seq((info.ingestionTime, info.startTime, ChunkSetInfo.toBytes(info)))
indexTable.writeIndices(partition, infos, sinkStats, diskTimeToLiveSeconds)
}

/**
Expand Down Expand Up @@ -212,11 +215,11 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
s"endTimeExclusive=$endTimeExclusive maxChunkTime=$maxChunkTime")
// This could be more parallel, but decision was made to control parallelism at one place: In spark (via its
// parallelism configuration. Revisit if needed later.
val batchReadSpan = Kamon.spanBuilder("cassandra-per-batch-data-read-latency").start()
val start = System.currentTimeMillis()
try {
chunksTable.readRawPartitionRangeBBNoAsync(parts, userTimeStart - maxChunkTime, endTimeExclusive)
} finally {
batchReadSpan.finish()
readChunksBatchLatency.record(System.currentTimeMillis() - start)
}
}
}
Expand Down Expand Up @@ -376,7 +379,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
writeToPkUTTable: Boolean = true): Future[Response] = {
val pkTable = getOrCreatePartitionKeysTable(ref, shard)
val pkByUTTable = getOrCreatePartitionKeysByUpdateTimeTable(ref)
val span = Kamon.spanBuilder("write-part-keys").asChildOf(Kamon.currentSpan()).start()
val start = System.currentTimeMillis()
val ret = partKeys.mapAsync(writeParallelism) { pk =>
val ttl = if (pk.endTime == Long.MaxValue) -1 else diskTTLSeconds
// caller needs to supply hash for partKey - cannot be None
Expand All @@ -393,7 +396,9 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
resp
}
}.findL(_.isInstanceOf[ErrorResponse]).map(_.getOrElse(Success)).runAsync
ret.onComplete(_ => span.finish())
ret.onComplete { _ =>
writePksLatency.record(System.currentTimeMillis() - start)
}
ret
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package filodb.cassandra.columnstore

import scala.concurrent.Future

import com.typesafe.config.ConfigFactory
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}

import filodb.cassandra.DefaultFiloSessionProvider
import filodb.core.{MachineMetricsData, TestData}
import filodb.core.binaryrecord2.{BinaryRecordRowReader, RecordBuilder}
import filodb.core.downsample.OffHeapMemory
import filodb.core.memstore._
import filodb.core.memstore.FiloSchedulers.QuerySchedName
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.query.{ColumnFilter, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.{ColumnFilter, PlannerParams, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.Filter.Equals
import filodb.core.store.{InMemoryMetaStore, PartKeyRecord, StoreConfig, TimeRangeChunkScan}
import filodb.memory.format.ZeroCopyUTF8String._
Expand Down Expand Up @@ -84,7 +82,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala

MachineMetricsData.records(dataset, rawSamples).records.foreach { case (base, offset) =>
val rr = new BinaryRecordRowReader(Schemas.gauge.ingestionSchema, base, offset)
part.ingest( System.currentTimeMillis(), rr, offheapMem.blockMemFactory)
part.ingest( System.currentTimeMillis(), rr, offheapMem.blockMemFactory, false, Option.empty)
part.switchBuffers(offheapMem.blockMemFactory, true)
}
val chunks = part.makeFlushChunks(offheapMem.blockMemFactory)
Expand Down Expand Up @@ -187,8 +185,9 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
def query(memStore: TimeSeriesMemStore): Future[QueryResponse] = {
val colFilters = seriesTags.map { case (t, v) => ColumnFilter(t.toString, Equals(v.toString)) }.toSeq
val queryFilters = colFilters :+ ColumnFilter("_metric_", Equals(gaugeName))
val exec = MultiSchemaPartitionsExec(QueryContext(sampleLimit = numSamples * 2), InProcessPlanDispatcher,
dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 * numSamples))
val exec = MultiSchemaPartitionsExec(QueryContext(plannerParams = PlannerParams(sampleLimit = numSamples * 2)),
InProcessPlanDispatcher, dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 *
numSamples))
val queryConfig = new QueryConfig(config.getConfig("query"))
val querySession = QuerySession(QueryContext(), queryConfig)
exec.execute(memStore, querySession)(queryScheduler).runAsync(queryScheduler)
Expand Down
32 changes: 27 additions & 5 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import filodb.query._
// scalastyle:off
class Arguments(args: Seq[String]) extends ScallopConf(args) {


val dataset = opt[String]()
val database = opt[String]()
val command = opt[String]()
Expand Down Expand Up @@ -64,6 +63,9 @@ class Arguments(args: Seq[String]) extends ScallopConf(args) {
val everynseconds = opt[String]()
val shards = opt[List[String]]()
val spread = opt[Int]()
val k = opt[Int]()
val shardkeyprefix = opt[List[String]](default = Some(List()))

verify()

override def onError(e: Throwable): Unit = e match {
Expand Down Expand Up @@ -101,6 +103,7 @@ object CliMain extends FilodbClusterNode {
println(" --host <hostname/IP> [--port ...] --command list")
println(" --host <hostname/IP> [--port ...] --command status --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command labelvalues --labelName <lable-names> --labelfilter <label-filter> --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command topkcard --dataset prometheus --k 2 --shardkeyprefix demo App-0")
println(""" --command promFilterToPartKeyBR --promql "myMetricName{_ws_='myWs',_ns_='myNs'}" --schema prom-counter""")
println(""" --command partKeyBrAsString --hexpk 0x2C0000000F1712000000200000004B8B36940C006D794D65747269634E616D650E00C104006D794E73C004006D795773""")
println(""" --command decodeChunkInfo --hexchunkinfo 0x12e8253a267ea2db060000005046fc896e0100005046fc896e010000""")
Expand Down Expand Up @@ -132,8 +135,8 @@ object CliMain extends FilodbClusterNode {
}

def main(rawArgs: Array[String]): Unit = {
val args = new Arguments(rawArgs)
try {
val args = new Arguments(rawArgs)
val timeout = args.timeoutseconds().seconds
args.command.toOption match {
case Some("init") =>
Expand Down Expand Up @@ -165,6 +168,23 @@ object CliMain extends FilodbClusterNode {
val values = remote.getIndexValues(ref, args.indexname(), args.shards().head.toInt, args.limit())
values.foreach { case (term, freq) => println(f"$term%40s\t$freq") }

case Some("topkcard") =>
require(args.host.isDefined && args.dataset.isDefined && args.k.isDefined,
"--host, --dataset, --k must be defined")
val (remote, ref) = getClientAndRef(args)
val res = remote.getTopkCardinality(ref, args.shards.getOrElse(Nil).map(_.toInt),
args.shardkeyprefix(), args.k())
println(s"ShardKeyPrefix: ${args.shardkeyprefix}")
res.groupBy(_.shard).foreach { crs =>
println(s"Shard: ${crs._1}")
printf("%40s %12s %10s %10s\n", "Child", "TimeSeries", "Children", "Children")
printf("%40s %12s %10s %10s\n", "Name", "Count", "Count", "Quota")
println("===================================================================================")
crs._2.foreach { cr =>
printf("%40s %12d %10d %10d\n", cr.childName, cr.timeSeriesCount, cr.childrenCount, cr.childrenQuota)
}
}

case Some("status") =>
val (remote, ref) = getClientAndRef(args)
dumpShardStatus(remote, ref)
Expand Down Expand Up @@ -349,9 +369,11 @@ object CliMain extends FilodbClusterNode {
options: QOptions, tsdbQueryParams: TsdbQueryParams): Unit = {
val ref = DatasetRef(dataset)
val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s)))
val qOpts = QueryContext(tsdbQueryParams, spreadProvider, options.sampleLimit)
.copy(queryTimeoutMillis = options.timeout.toMillis.toInt,
shardOverrides = options.shardOverrides)

val qOpts = QueryContext(origQueryParams = tsdbQueryParams,
plannerParams = PlannerParams(applicationId = "filodb-cli", spreadOverride = spreadProvider,
sampleLimit = options.sampleLimit, queryTimeoutMillis = options.timeout.toMillis.toInt,
shardOverrides = options.shardOverrides))
println(s"Sending query command to server for $ref with options $qOpts...")
println(s"Query Plan:\n$plan")
options.everyN match {
Expand Down
3 changes: 3 additions & 0 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@

# Limits maximum amount of data a single leaf query can scan
max-data-per-shard-query = 50 MB

# Set to true to enable metering of time series. Used for rate-limiting
metering-enabled = true
}
downsample {
# Resolutions for downsampled data ** in ascending order **
Expand Down
16 changes: 16 additions & 0 deletions conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ filodb {
"conf/timeseries-dev-source.conf"
]

quotas {
prometheus {
defaults = [100, 500, 10000, 100000]
custom = [
{
shardKeyPrefix = ["demo", "App-0", "heap_usage"]
quota = 100
},
{
shardKeyPrefix = ["demo"]
quota = 10
}
]
}
}

spread-default = 1

# Override default spread for application using override block which will have non metric shard keys and spread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import scala.util.control.NonFatal
import akka.actor.{ActorRef, Props}
import akka.event.LoggingReceive
import kamon.Kamon
import kamon.metric.MeasurementUnit
import monix.eval.Task
import monix.execution.{CancelableFuture, Scheduler, UncaughtExceptionReporter}
import monix.reactive.Observable
Expand Down Expand Up @@ -293,10 +294,12 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
private def doRecovery(shard: Int, startOffset: Long, endOffset: Long, interval: Long,
checkpoints: Map[Int, Long]): Future[Option[Long]] = {
val futTry = create(shard, Some(startOffset)) map { ingestionStream =>
val recoveryTrace = Kamon.spanBuilder("ingestion-recovery-trace")
.asChildOf(Kamon.currentSpan())
.tag("shard", shard.toString)
.tag("dataset", ref.toString).start()

val ingestionRecoveryLatency = Kamon.histogram("ingestion-recovery-latency", MeasurementUnit.time.milliseconds)
.withTag("dataset", ref.dataset)
.withTag("shard", shard)

val recoveryStart = System.currentTimeMillis()
val stream = ingestionStream.get
statusActor ! RecoveryInProgress(ref, shard, nodeCoord, 0)

Expand All @@ -317,12 +320,11 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
logger.info(s"Finished recovery for dataset=$ref shard=$shard")
ingestionStream.teardown()
streams.remove(shard)
recoveryTrace.finish()
ingestionRecoveryLatency.record(System.currentTimeMillis() - recoveryStart)
case Failure(ex) =>
recoveryTrace.fail(s"Recovery failed for dataset=$ref shard=$shard", ex)
logger.error(s"Recovery failed for dataset=$ref shard=$shard", ex)
handleError(ref, shard, ex)
recoveryTrace.finish()
ingestionRecoveryLatency.record(System.currentTimeMillis() - recoveryStart)
}(actorDispatcher)
fut
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import scala.concurrent.duration._
import akka.actor.{ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.event.LoggingReceive
import kamon.Kamon
import net.ceedubs.ficus.Ficus._

import filodb.coordinator.client.MiscCommands
Expand Down Expand Up @@ -195,6 +196,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
def queryHandlers: Receive = LoggingReceive {
case q: QueryCommand =>
val originator = sender()
Kamon.currentSpan().mark("NodeCoordinatorActor received query")
withQueryActor(originator, q.dataset) { _.tell(q, originator) }
case QueryActor.ThrowException(dataset) =>
val originator = sender()
Expand Down

0 comments on commit 7f35cdc

Please sign in to comment.