Skip to content

Commit

Permalink
Merge branch 'integration' into main 0.9.10
Browse files Browse the repository at this point in the history
  • Loading branch information
sherali42 committed Nov 9, 2020
2 parents de5f8a0 + efaa548 commit c478297
Show file tree
Hide file tree
Showing 121 changed files with 2,203 additions and 906 deletions.
Expand Up @@ -2,6 +2,7 @@ package filodb.akkabootstrapper.multijvm

import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.AddressFromURIString
import akka.cluster.Cluster
import akka.http.scaladsl.Http
Expand Down
Expand Up @@ -64,6 +64,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
private val pkByUTNumSplits = cassandraConfig.getInt("pk-by-updated-time-table-num-splits")
private val pkByUTTtlSeconds = cassandraConfig.getDuration("pk-by-updated-time-table-ttl", TimeUnit.SECONDS).toInt
private val createTablesEnabled = cassandraConfig.getBoolean("create-tables-enabled")
private val numTokenRangeSplitsForScans = cassandraConfig.getInt("num-token-range-splits-for-scans")

val sinkStats = new ChunkSinkStats

Expand Down Expand Up @@ -316,7 +317,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
* @param splitsPerNode - how much parallelism or ways to divide a token range on each node
* @return each split will have token_start, token_end, replicas filled in
*/
def getScanSplits(dataset: DatasetRef, splitsPerNode: Int = 1): Seq[ScanSplit] = {
def getScanSplits(dataset: DatasetRef, splitsPerNode: Int = numTokenRangeSplitsForScans): Seq[ScanSplit] = {
val keyspace = clusterConnector.keyspace
require(splitsPerNode >= 1, s"Must specify at least 1 splits_per_node, got $splitsPerNode")

Expand Down
169 changes: 91 additions & 78 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Expand Up @@ -8,8 +8,9 @@ import scala.concurrent.duration._
import scala.util.Try

import com.opencsv.CSVWriter
import com.quantifind.sumac.{ArgMain, FieldArgs}
import monix.reactive.Observable
import org.rogach.scallop.ScallopConf
import org.rogach.scallop.exceptions.ScallopException
import org.scalactic._

import filodb.coordinator._
Expand All @@ -27,41 +28,53 @@ import filodb.prometheus.parse.Parser
import filodb.query._

// scalastyle:off
class Arguments extends FieldArgs {
var dataset: Option[String] = None
var database: Option[String] = None
var command: Option[String] = None
var filename: Option[String] = None
var configPath: Option[String] = None
class Arguments(args: Seq[String]) extends ScallopConf(args) {


val dataset = opt[String]()
val database = opt[String]()
val command = opt[String]()
val filename = opt[String]()
val configpath = opt[String]()
// max # of results returned. Don't make it too high.
var limit: Int = 200
var sampleLimit: Int = 1000000
var timeoutSeconds: Int = 60
var outfile: Option[String] = None
var delimiter: String = ","
var indexName: Option[String] = None
var host: Option[String] = None
var port: Int = 2552
var promql: Option[String] = None
var schema: Option[String] = None
var hexPk: Option[String] = None
var hexVector: Option[String] = None
var hexChunkInfo: Option[String] = None
var vectorType: Option[String] = None
var matcher: Option[String] = None
var labelNames: Seq[String] = Seq.empty
var labelFilter: Map[String, String] = Map.empty
var start: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
var end: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
var minutes: Option[String] = None
var step: Long = 10 // in seconds
var chunks: Option[String] = None // select either "memory" or "buffers" chunks only
var everyNSeconds: Option[String] = None
var shards: Option[Seq[String]] = None
var spread: Option[Integer] = None
val limit = opt[Int](default = Some(200))
val samplelimit = opt[Int](default = Some(1000000))
val timeoutseconds = opt[Int](default = Some(60))
val outfile = opt[String]()
val indexname = opt[String]()
val host = opt[String]()
val port = opt[Int](default = Some(2552))
val promql = opt[String]()
val schema = opt[String]()
val hexpk = opt[String]()
val hexvector = opt[String]()
val hexchunkinfo = opt[String]()
val vectortype = opt[String]()
val matcher = opt[String]()
val labelnames = opt[List[String]](default = Some(List()))
val labelfilter = opt[Map[String, String]](default = Some(Map.empty))
val currentTime = System.currentTimeMillis()/1000

// val starts = opt[Long](default = Some(currentTime))
val start = opt[Long](default = Some(currentTime))// promql argument is seconds since epoch
val end = opt[Long](default = Some(currentTime))// promql argument is seconds since epoch
val minutes = opt[String]()
val step = opt[Long](default = Some(10)) // in seconds
val chunks = opt[String]() // select either "memory" or "buffers" chunks only
val everynseconds = opt[String]()
val shards = opt[List[String]]()
val spread = opt[Int]()
verify()

override def onError(e: Throwable): Unit = e match {

case ScallopException(message) => throw e
case other => throw other
}

}

object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
object CliMain extends FilodbClusterNode {
var exitCode = 0

override val role = ClusterRole.Cli
Expand All @@ -83,46 +96,46 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
println("\nStandalone client commands:")
println(" --host <hostname/IP> [--port ...] --command indexnames --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command indexvalues --indexname <index> --dataset <dataset> --shards SS")
println(" --host <hostname/IP> [--port ...] [--metricColumn <col>] --dataset <dataset> --promql <query> --start <start> --step <step> --end <end>")
println(" --host <hostname/IP> [--port ...] --command setup --filename <configFile> | --configPath <path>")
println(" --host <hostname/IP> [--port ...] --dataset <dataset> --promql <query> --start <start> --step <step> --end <end>")
println(" --host <hostname/IP> [--port ...] --command setup --filename <configFile> | --configpath <path>")
println(" --host <hostname/IP> [--port ...] --command list")
println(" --host <hostname/IP> [--port ...] --command status --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command timeseriesMetadata --matcher <matcher-query> --dataset <dataset> --start <start> --end <end>")
println(" --host <hostname/IP> [--port ...] --command labelValues --labelName <lable-names> --labelFilter <label-filter> --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command labelvalues --labelName <lable-names> --labelfilter <label-filter> --dataset <dataset>")
println(""" --command promFilterToPartKeyBR --promql "myMetricName{_ws_='myWs',_ns_='myNs'}" --schema prom-counter""")
println(""" --command partKeyBrAsString --hexPk 0x2C0000000F1712000000200000004B8B36940C006D794D65747269634E616D650E00C104006D794E73C004006D795773""")
println(""" --command decodeChunkInfo --hexChunkInfo 0x12e8253a267ea2db060000005046fc896e0100005046fc896e010000""")
println(""" --command decodeVector --hexVector 0x1b000000080800000300000000000000010000000700000006080400109836 --vectorType d""")
println(""" --command partKeyBrAsString --hexpk 0x2C0000000F1712000000200000004B8B36940C006D794D65747269634E616D650E00C104006D794E73C004006D795773""")
println(""" --command decodeChunkInfo --hexchunkinfo 0x12e8253a267ea2db060000005046fc896e0100005046fc896e010000""")
println(""" --command decodeVector --hexvector 0x1b000000080800000300000000000000010000000700000006080400109836 --vectortype d""")
println("\nTo change config: pass -Dconfig.file=/path/to/config as first arg or set $FILO_CONFIG_FILE")
println(" or override any config by passing -Dconfig.path=newvalue as first args")
println("\nFor detailed debugging, uncomment the TRACE/DEBUG loggers in logback.xml and add these ")
println(" options: ./filo-cli -Dakka.loglevel=DEBUG -Dakka.actor.debug.receive=on -Dakka.actor.debug.autoreceive=on --command importcsv ...")
}

def getRef(args: Arguments): DatasetRef = DatasetRef(args.dataset.get, args.database)
def getRef(args: Arguments): DatasetRef = DatasetRef(args.dataset(), args.database.toOption)

def getClientAndRef(args: Arguments): (LocalClient, DatasetRef) = {
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
(remote, DatasetRef(args.dataset.get))
require(args.host.isDefined && args.dataset.isDefined, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
(remote, DatasetRef(args.dataset()))
}

def getQueryRange(args: Arguments): TimeRangeParams =
args.chunks.filter { cOpt => cOpt == "memory" || cOpt == "buffers" }
.map {
case "memory" => InMemoryParam(args.step)
case "buffers" => WriteBuffersParam(args.step)
case "memory" => InMemoryParam(args.step())
case "buffers" => WriteBuffersParam(args.step())
}.getOrElse {
args.minutes.map { minArg =>
val end = System.currentTimeMillis() / 1000
TimeStepParams(end - minArg.toInt * 60, args.step, end)
}.getOrElse(TimeStepParams(args.start, args.step, args.end))
TimeStepParams(end - minArg.toInt * 60, args.step(), end)
}.getOrElse(TimeStepParams(args.start(), args.step(), args.end()))
}

def main(args: Arguments): Unit = {
def main(rawArgs: Array[String]): Unit = {
val args = new Arguments(rawArgs)
try {
val timeout = args.timeoutSeconds.seconds
args.command match {
val timeout = args.timeoutseconds().seconds
args.command.toOption match {
case Some("init") =>
println("Initializing FiloDB Admin keyspace and tables...")
parse(metaStore.initialize(), timeout) {
Expand All @@ -137,7 +150,7 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {

case Some("list") =>
args.host.map { server =>
listRegisteredDatasets(Client.standaloneClient(system, server, args.port))
listRegisteredDatasets(Client.standaloneClient(system, server, args.port()))
}

case Some("indexnames") =>
Expand All @@ -146,10 +159,10 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
names.foreach(println)

case Some("indexvalues") =>
require(args.indexName.nonEmpty, "--indexName required")
require(args.shards.nonEmpty, "--shards required")
require(args.indexname.isDefined, "--indexName required")
require(args.shards.isDefined, "--shards required")
val (remote, ref) = getClientAndRef(args)
val values = remote.getIndexValues(ref, args.indexName.get, args.shards.get.head.toInt, args.limit)
val values = remote.getIndexValues(ref, args.indexname(), args.shards().head.toInt, args.limit())
values.foreach { case (term, freq) => println(f"$term%40s\t$freq") }

case Some("status") =>
Expand All @@ -159,45 +172,45 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
case Some("validateSchemas") => validateSchemas()

case Some("promFilterToPartKeyBR") =>
require(args.promql.nonEmpty && args.schema.nonEmpty, "--promql and --schema must be defined")
promFilterToPartKeyBr(args.promql.get, args.schema.get)
require(args.promql.isDefined && args.schema.isDefined, "--promql and --schema must be defined")
promFilterToPartKeyBr(args.promql(), args.schema())

case Some("partKeyBrAsString") =>
require(args.hexPk.nonEmpty, "--hexPk must be defined")
partKeyBrAsString(args.hexPk.get)
require(args.hexpk.isDefined, "--hexPk must be defined")
partKeyBrAsString(args.hexpk())

case Some("decodeChunkInfo") =>
require(args.hexChunkInfo.nonEmpty, "--hexChunkInfo must be defined")
decodeChunkInfo(args.hexChunkInfo.get)
require(args.hexchunkinfo.isDefined, "--hexChunkInfo must be defined")
decodeChunkInfo(args.hexchunkinfo())

case Some("decodeVector") =>
require(args.hexVector.nonEmpty && args.vectorType.nonEmpty, "--hexVector and --vectorType must be defined")
decodeVector(args.hexVector.get, args.vectorType.get)
require(args.hexvector.isDefined && args.vectortype.isDefined, "--hexVector and --vectorType must be defined")
decodeVector(args.hexvector(), args.vectortype())

case Some("timeseriesMetadata") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.matcher.nonEmpty, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseTimeSeriesMetadataQuery(remote, args.matcher.get, args.dataset.get,
require(args.host.isDefined && args.dataset.isDefined && args.matcher.isDefined, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.samplelimit(), args.everynseconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
parseTimeSeriesMetadataQuery(remote, args.matcher(), args.dataset(),
getQueryRange(args), true, options)

case Some("labelValues") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.labelNames.nonEmpty, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseLabelValuesQuery(remote, args.labelNames, args.labelFilter, args.dataset.get,
require(args.host.isDefined && args.dataset.isDefined && args.labelnames.isDefined, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.samplelimit(), args.everynseconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
parseLabelValuesQuery(remote, args.labelnames(), args.labelfilter(), args.dataset(),
getQueryRange(args), options)

case x: Any =>
// This will soon be deprecated
args.promql.map { query =>
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parsePromQuery2(remote, query, args.dataset.get, getQueryRange(args), options)
require(args.host.isDefined && args.dataset.isDefined, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.samplelimit(), args.everynseconds.toOption.map(_.toInt),
timeout, args.shards.toOption.map(_.map(_.toInt)), args.spread.toOption.map(Integer.valueOf))
parsePromQuery2(remote, query, args.dataset(), getQueryRange(args), options)
}
.getOrElse(printHelp)
}
Expand Down
33 changes: 33 additions & 0 deletions cli/src/test/scala/filodb/cli/FilodbCliSpec.scala
@@ -1,10 +1,14 @@
package filodb.cli

import org.rogach.scallop.exceptions.ScallopException

import filodb.coordinator.{ActorName, ClusterRole, RunnableSpec}

class FilodbCliSpec extends RunnableSpec {
"A Filodb Cli" must {
"initialize" in {

testScallopOptions()
eventually(CliMain.cluster.isInitialized)
}
"create and setup the coordinatorActor and clusterActor" in {
Expand All @@ -20,4 +24,33 @@ class FilodbCliSpec extends RunnableSpec {
eventually(CliMain.cluster.isTerminated)
}
}

def testScallopOptions(): Unit = {


parseSucessFully("--host localhost --command indexnames --dataset prometheus")
parseSucessFully("--host localhost --port 6564 --command indexvalues --indexname asdasd --dataset prometheus --shards SS")
parseSucessFully("""--host localhost --port 6564 --dataset "adadasd" --promql "myMetricName{_ws_='myWs',_ns_='myNs'}" --start 1212 --step 5555 --end 1212""")
parseSucessFully("--host localhost --port 6564 --command timeseriesmetadata --matcher a=b --dataset prometheus --start 123123 --end 13123")
parseSucessFully("--host localhost --port 6564 --command labelvalues --labelnames a --labelfilter a=b --dataset prometheus")
parseSucessFully("""--command promFilterToPartKeyBR --promql "myMetricName{_ws_='myWs',_ns_='myNs'}" --schema prom-counter""")
parseSucessFully("""--command partKeyBrAsString --hexpk 0x2C0000000F1712000000200000004B8B36940C006D794D65747269634E616D650E00C104006D794E73C004006D795773""")
parseSucessFully("""--command decodeChunkInfo --hexchunkinfo 0x12e8253a267ea2db060000005046fc896e0100005046fc896e010000""")
parseSucessFully("""--command decodeVector --hexvector 0x1b000000080800000300000000000000010000000700000006080400109836 --vectortype d""")

parserError("""--host localhost --port 6564 --metriccolumn adasdasd --dataset "adadasd" --promql "myMetricName{_ws_='myWs',_ns_='myNs'}" --start 1231673123675123 --step 13131312313123123 --end 5""")
parserError("""--command partKeyBrAsString --hexPk 0x2C0000000F1712000000200000004B8B36940C006D794D65747269634E616D650E00C104006D794E73C004006D795773""")
parserError("""--command decodeChunkInfo --hexChunkInfo 0x12e8253a267ea2db060000005046fc896e0100005046fc896e010000""")
parserError("""--command decodeVector --hexVector 0x1b000000080800000300000000000000010000000700000006080400109836 --vectortype d""")

}
def parseSucessFully(commandLine: String): Unit = {
new Arguments(commandLine.split(" "))
}

def parserError(commandLine: String):Unit = {
intercept[ScallopException]{
new Arguments(commandLine.split(" "))
}
}
}
23 changes: 23 additions & 0 deletions conf/logback-perf.xml
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<jmxConfigurator />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%date{ISO8601}] %-5level %thread %logger{26} [%X{akkaSource}] - %msg%n</pattern>
</encoder>
</appender>

<logger name="filodb.coordinator.KamonMetricsLogReporter" level="off" />
<logger name="filodb.coordinator.KamonSpanLogReporter" level="off" />
<logger name="filodb.core.memstore.LuceneMetricsRouter" level="off" />

<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
<logger name="org.apache.kafka.common.security" level="WARN"/>
<logger name="org.apache.kafka.common.network" level="WARN"/>
<logger name="org.apache.kafka.common.metrics" level="ERROR"/>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
13 changes: 13 additions & 0 deletions conf/promperf-filodb-server.conf
@@ -0,0 +1,13 @@
include "timeseries-filodb-server.conf"

filodb {
dataset-configs = [
"conf/promperf-source.conf"
]

spread-default = 3

# Override default spread for application using override block which will have non metric shard keys and spread.
spread-assignment = []

}
13 changes: 13 additions & 0 deletions conf/promperf-source.conf
@@ -0,0 +1,13 @@
include "timeseries-dev-source.conf"

dataset = "promperf"
num-shards = 8
min-num-nodes = 1
sourceconfig {
filo-topic-name = "prom-perf"
store {
flush-interval = 2m
disk-time-to-live = 720 hours // 30 days
shard-mem-size = 128MB
}
}

0 comments on commit c478297

Please sign in to comment.