Skip to content

Commit

Permalink
0.9.2 Release
Browse files Browse the repository at this point in the history
  • Loading branch information
tjackpaul committed Jan 16, 2020
2 parents cc6a8f9 + 633defc commit daa230e
Show file tree
Hide file tree
Showing 59 changed files with 1,921 additions and 526 deletions.
46 changes: 39 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ sbt standalone/assembly cli/assembly gateway/assembly

First initialize the keyspaces and tables in Cassandra.
```
./filo-cli -Dconfig.file=conf/timeseries-filodb-server.conf --command init
./scripts/schema-create.sh filodb_admin filodb filodb_downsample prometheus 4 1,5 > /tmp/ddl.cql
cqlsh -f /tmp/ddl.cql
```
Verify that tables were created in `filodb` and `filodb-admin` keyspaces using `cqlsh`:
Verify that tables were created in `filodb`, `filodb_downsample` and `filodb-admin` keyspaces using `cqlsh`:
First type `cqlsh` to start the cassandra cli. Then check the keyspaces by entering `DESCRIBE keyspaces`.


Expand All @@ -173,7 +174,6 @@ The script below brings up the FiloDB Dev Standalone server, and then sets up th
Note that the above script starts the server with configuration at `conf/timeseries-filodb-server.conf`. This config
file refers to the following datasets that will be loaded on bootstrap:
* `conf/timeseries-dev-source.conf`
* `conf/timeseries-ds-1m-dev-source.conf`

For queries to work properly you'll want to start a second server to serve all the shards:

Expand Down Expand Up @@ -280,6 +280,18 @@ The following will scrape metrics from FiloDB using its Prometheus metrics endpo

Now, metrics from the application having a Prom endpoint at port 9095 will be streamed into Kafka and FiloDB.

Querying the total number of ingesting time series for the last 5 minutes, every 10 seconds:

./filo-cli --host 127.0.0.1 --dataset prometheus --promql 'sum(num_ingesting_partitions{_ws_="local_test",_ns_="filodb"})' --minutes 5

Note that histograms are ingested using FiloDB's optimized histogram format, which leads to very large savings in space. For example, querying the 90%-tile for the size of chunks written to Cassandra, last 5 minutes:

./filo-cli '-Dakka.remote.netty.tcp.hostname=127.0.0.1' --host 127.0.0.1 --dataset prometheus --promql 'histogram_quantile(0.9, sum(rate(chunk_bytes_per_call{_ws_="local_test",_ns_="filodb"}[3m])))' --minutes 5

Here is how you display the raw histogram data for the same:

./filo-cli '-Dakka.remote.netty.tcp.hostname=127.0.0.1' --host 127.0.0.1 --dataset prometheus --promql 'chunk_bytes_per_call{_ws_="local_test",_ns_="filodb"}' --minutes 5

#### Multiple Servers using Consul

The original example used a static IP to form a cluster, but a more realistic example is to use a registration service like Consul.
Expand Down Expand Up @@ -307,10 +319,28 @@ Start first FiloDB server
```
./filodb-dev-start.sh -c conf/timeseries-filodb-server-consul.conf -l 1
```
And subsequent FiloDB servers. Change log file suffix with the `-l` option for each server. Add the `-s` option to
the last server, so data setup is initiated after all servers come up.
And subsequent FiloDB servers. Change log file suffix with the `-l` option for each server.
```
./filodb-dev-start.sh -c conf/timeseries-filodb-server-consul.conf -l 2 -p
```

### Downsample Filo Cluster

To bring up local cluster for serving downsampled data

```
./filodb-dev-start.sh -c conf/timeseries-filodb-server-ds.conf -l 1
```
./filodb-dev-start.sh -c conf/timeseries-filodb-server-consul.conf -l 2 -p -s
Subsequent servers. Change log file suffix with the `-l` option for each server.
```
./filodb-dev-start.sh -c conf/timeseries-filodb-server-ds.conf -l 2 -p
```

If you had run the unit test `DownsamplerMainSpec` which populates data into the downsample
dataset, you can query downsample results by visiting the following URL:

```
curl "http://localhost:8080/promql/prometheus/api/v1/query_range?query=my_counter\{_ws_='my_ws',_ns_='my_ns'\}&start=1574272801&end=1574273042&step=10&verbose=true&spread=2"
```

#### Local Scale Testing
Expand Down Expand Up @@ -418,7 +448,7 @@ The number of shards in each dataset is preconfigured in the source config. Ple

Metrics are routed to shards based on factors:

1. Shard keys, which can be for example an application and the metric name, which define a group of shards to use for that application. This allows limiting queries to a subset of shards for lower latency. Currently `_ws_`, `_ns_`, `metric` labels are mandatory to calculate shard key along with `metric` column.
1. Shard keys, which can be for example an application and the metric name, which define a group of shards to use for that application. This allows limiting queries to a subset of shards for lower latency. Currently `_ws_`, `_ns_` labels are mandatory to calculate shard key along with `_metric_` column.
2. The rest of the tags or components of a partition key are then used to compute which shard within the group of shards to assign to.

## Querying FiloDB
Expand All @@ -445,6 +475,8 @@ Example of debugging chunk metadata using the CLI:

./filo-cli --host 127.0.0.1 --dataset prometheus --promql '_filodb_chunkmeta_all(heap_usage{_ws_="demo",_ns_="App-0"})' --start XX --end YY

There is also a special filter, `_type_="gauge"`, to filter on only a particular type of metric or schema. Normally, this is not necessary unless a user changes the type of metric in their application, for example from a gauge to a histogram. The types are found in the configuration `schemas` section, and by default are `gauge`, `prom-counter`, `prom-histogram`, and `untyped`.

### First-Class Histogram Support

One major difference FiloDB has from the Prometheus data model is that FiloDB supports histograms as a first-class entity. In Prometheus, histograms are stored with each bucket in its own time series differentiated by the `le` tag. In FiloDB, there is a `HistogramColumn` which stores all the buckets together for significantly improved compression, especially over the wire during ingestion, as well as significantly faster query speeds (up to two orders of magnitude). There is no "le" tag or individual time series for each bucket. Here are the differences users need to be aware of when using `HistogramColumn`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CassandraTSStoreFactory(config: Config, ioPool: Scheduler) extends StoreFa
}

class DownsampledTSStoreFactory(config: Config, ioPool: Scheduler) extends StoreFactory {
val colStore = new CassandraColumnStore(config, ioPool)(ioPool)
val colStore = new CassandraColumnStore(config, ioPool, None, true)(ioPool)
val metaStore = new CassandraMetaStore(config.getConfig("cassandra"))(ioPool)
val memStore = new DownsampledTimeSeriesStore(colStore, metaStore, config)(ioPool)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ trait FiloCassandraConnector extends StrictLogging {

implicit def ec: ExecutionContext

lazy val defaultKeySpace = config.getString("keyspace")

def keySpaceName(ref: DatasetRef): String = ref.database.getOrElse(defaultKeySpace)
def keyspace: String

def createKeyspace(keyspace: String): Unit = {
val replOptions = config.getString("keyspace-replication-options")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait BaseDatasetTable extends StrictLogging {
// The suffix for the dataset table, ie chunks, index, filter, etc.
def dataset: DatasetRef
def suffix: String
lazy val keyspace = dataset.database.getOrElse(connector.defaultKeySpace)
lazy val keyspace = connector.keyspace
lazy val tableString = s"${keyspace}.${dataset.dataset + s"_$suffix"}"
lazy val session = connector.session

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import filodb.memory.BinaryRegionLarge
* @param sched A Scheduler for writes
*/
class CassandraColumnStore(val config: Config, val readEc: Scheduler,
val filoSessionProvider: Option[FiloSessionProvider] = None)
val filoSessionProvider: Option[FiloSessionProvider] = None,
val downsampledData: Boolean = false)
(implicit val sched: Scheduler)
extends ColumnStore with CassandraChunkSource with StrictLogging {
import collection.JavaConverters._
Expand Down Expand Up @@ -160,15 +161,20 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {

/**
* Reads chunks by querying partitions by ingestion time range and subsequently filtering by user time range.
* ** User/Ingestion End times are exclusive **
*
* Important Details:
* 1. User End time is exclusive. Important since we should not downsample one sample in two job runs
* 2. Since we do a query based on maxChunkTime which is usually configured to be slightly greater than
* flush interval, results can include chunks that are before the requested range. Callers need to
* handle this case.
*/
// scalastyle:off parameter.number
def getChunksByIngestionTimeRange(datasetRef: DatasetRef,
splits: Iterator[ScanSplit],
ingestionTimeStart: Long,
ingestionTimeEnd: Long,
userTimeStart: Long,
userTimeEnd: Long,
endTimeExclusive: Long,
maxChunkTime: Long,
batchSize: Int,
batchTime: FiniteDuration): Observable[Seq[RawPartData]] = {
Expand All @@ -186,10 +192,10 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
val chunksTable = getOrCreateChunkTable(datasetRef)
partKeys.bufferTimedAndCounted(batchTime, batchSize).map { parts =>
logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " +
s"userTimeEnd=$userTimeEnd maxChunkTime=$maxChunkTime")
s"endTimeExclusive=$endTimeExclusive maxChunkTime=$maxChunkTime")
// TODO evaluate if we can increase parallelism here. This needs to be tuneable
// based on how much faster downsampling should run, and how much additional read load cassandra can take.
chunksTable.readRawPartitionRangeBB(parts, userTimeStart - maxChunkTime, userTimeEnd).toIterator().toSeq
chunksTable.readRawPartitionRangeBB(parts, userTimeStart - maxChunkTime, endTimeExclusive).toIterator().toSeq
}
}

Expand All @@ -205,7 +211,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
* @return each split will have token_start, token_end, replicas filled in
*/
def getScanSplits(dataset: DatasetRef, splitsPerNode: Int = 1): Seq[ScanSplit] = {
val keyspace = clusterConnector.keySpaceName(dataset)
val keyspace = clusterConnector.keyspace
require(splitsPerNode >= 1, s"Must specify at least 1 splits_per_node, got $splitsPerNode")

val tokenRanges = unwrapTokenRanges(clusterMeta.getTokenRanges.asScala.toSeq)
Expand Down Expand Up @@ -257,7 +263,8 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
}

def writePartKeys(ref: DatasetRef, shard: Int,
partKeys: Observable[PartKeyRecord], diskTTLSeconds: Int): Future[Response] = {
partKeys: Observable[PartKeyRecord],
diskTTLSeconds: Int): Future[Response] = {
val table = getOrCreatePartitionKeysTable(ref, shard)
val span = Kamon.buildSpan("write-part-keys").start()
val ret = partKeys.mapAsync(writeParallelism) { pk =>
Expand Down Expand Up @@ -288,6 +295,8 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {

val stats = new ChunkSourceStats

def downsampledData: Boolean

val cassandraConfig = config.getConfig("cassandra")
val ingestionConsistencyLevel = ConsistencyLevel.valueOf(cassandraConfig.getString("ingestion-consistency-level"))
val tableCacheSize = config.getInt("columnstore.tablecache-size")
Expand All @@ -301,7 +310,10 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {
def config: Config = cassandraConfig
def ec: ExecutionContext = readEc
val sessionProvider = filoSessionProvider.getOrElse(new DefaultFiloSessionProvider(cassandraConfig))
}

val keyspace: String = if (!downsampledData) config.getString("keyspace")
else config.getString("downsample-keyspace")
}

val partParallelism = 4

Expand Down Expand Up @@ -356,7 +368,6 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {
new IngestionTimeIndexTable(dataset, clusterConnector)(readEc) })
}


def getOrCreatePartitionKeysTable(dataset: DatasetRef, shard: Int): PartitionKeysTable = {
val map = partitionKeysTableCache.getOrElseUpdate(dataset, { _ =>
concurrentCache[Int, PartitionKeysTable](tableCacheSize)
Expand Down
4 changes: 2 additions & 2 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {

def printHelp(): Unit = {
println("filo-cli help:")
println(" commands: init create importcsv list truncate clearMetadata")
println(" commands: init create importcsv list clearMetadata")
println(" dataColumns/partitionColumns: <colName1>:<type1>,<colName2>:<type2>,... ")
println(" types: int,long,double,string,bitmap,ts,map")
println(" common options: --dataset --database")
println(" OR: --select col1, col2 [--limit <n>] [--outfile /tmp/out.csv]")
println("\n NOTE: truncate and clearMetadata should NOT be used while FiloDB instances are up\n")
println("\n NOTE: clearMetadata should NOT be used while FiloDB instances are up\n")
println("\nStandalone client commands:")
println(" --host <hostname/IP> [--port ...] --command indexnames --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command indexvalues --indexname <index> --dataset <dataset> --shards SS")
Expand Down
2 changes: 2 additions & 0 deletions conf/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
app = "filodb"
_ns_ = "filodb"
_ws_ = "local_test"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"

Expand Down
2 changes: 1 addition & 1 deletion conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
# Retention of downsampled data for the corresponding resolution
ttls = [ 30 days, 183 days ]
# Raw schemas from which to downsample
raw-schema-names = [ "gauge", "untyped" ]
raw-schema-names = [ "gauge", "untyped", "prom-counter", "prom-histogram"]
# class implementing the dispatch of downsample metrics to another dataset
publisher-class = "filodb.kafka.KafkaDownsamplePublisher"
publisher-config {
Expand Down
5 changes: 5 additions & 0 deletions conf/timeseries-filodb-server-ds.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
include "timeseries-filodb-server.conf"

filodb {
store-factory = "filodb.cassandra.DownsampledTSStoreFactory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import akka.event.LoggingReceive
import kamon.Kamon
import monix.eval.Task
import monix.execution.{CancelableFuture, Scheduler, UncaughtExceptionReporter}
import monix.reactive.Observable
import net.ceedubs.ficus.Ficus._

import filodb.core.{DatasetRef, Iterators}
Expand Down Expand Up @@ -174,6 +175,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
_ <- memStore.recoverIndex(ref, shard)
} yield {
streamSubscriptions(shard) = CancelableFuture.never // simulate ingestion happens continuously
streams(shard) = IngestionStream(Observable.never)
}
} else {
for {
Expand Down
13 changes: 9 additions & 4 deletions coordinator/src/main/scala/filodb.coordinator/ShardManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
publishChanges(dataset)
}
}
logger.info(s"Completed addMember for coordinator $coordinator")
logAllMappers(s"Completed addMember for coordinator $coordinator. Status Map:")
}

/** Called on MemberRemoved, new status already updated. */
Expand All @@ -168,6 +168,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
logger.info(s"Initiated removeMember for coordinator=$coordinator on $address")
_coordinators remove address
removeCoordinator(coordinator)
logAllMappers(s"Completed removeMember for coordinator $address")
coordinator
}
}
Expand Down Expand Up @@ -272,6 +273,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
s"for dataset=${shardStopReq.datasetRef} ")
val answer: Response = validateRequestAndStopShards(shardStopReq, ackTo)
.fold(_ => SuccessResponse, errorResponse => errorResponse)
logAllMappers(s"Completed stopShards $shardStopReq")
ackTo ! answer
}

Expand Down Expand Up @@ -315,6 +317,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
val answer: Response = validateRequestAndStartShards(shardStartReq.datasetRef,
shardStartReq.assignmentConfig, ackTo)
.fold(_ => SuccessResponse, errorResponse => errorResponse)
logAllMappers(s"Completed startShards $shardStartReq")
ackTo ! answer
}

Expand Down Expand Up @@ -360,7 +363,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
removeCoordinator(coord)
}
updateShardMetrics()
logAllMappers("After removing stale coordinators")
logAllMappers("Finished removing stale coordinators")
}

private def removeCoordinator(coordinator: ActorRef): Unit = {
Expand Down Expand Up @@ -403,7 +406,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
// Add dataset to subscribers and send initial ShardMapper snapshot
_subscriptions :+= ShardSubscription(dataset.ref, Set.empty)
_subscriptions.watchers foreach (subscribe(_, dataset.ref))
logger.info(s"Completed Setup for dataset=${dataset.ref}")
logAllMappers(s"Completed setup for dataset=${dataset.ref}")
ackTo.foreach(_ ! DatasetVerified)
assignments
}
Expand Down Expand Up @@ -435,7 +438,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
_datasetInfo remove dataset
_shardMappers remove dataset
_subscriptions = _subscriptions - dataset
logger.info(s"Completed removal for dataset=$dataset")
logAllMappers(s"Completed removal of dataset=$dataset")
}

/**
Expand Down Expand Up @@ -490,8 +493,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
if (assignments.valuesIterator.flatten.contains(event.shard)) {
setShardReassignmentTime(event.ref, event.shard, now)
info.metrics.numErrorReassignmentsDone.increment()
logAllMappers(s"Successfully reassigned dataset=${event.ref} shard=${event.shard}")
} else {
info.metrics.numErrorReassignmentsSkipped.increment()
logAllMappers(s"Could not reassign dataset=${event.ref} shard=${event.shard}")
logger.warn(s"Shard=${event.shard} from dataset=${event.ref} was NOT reassigned possibly " +
s"because no other node was available")
}
Expand Down

0 comments on commit daa230e

Please sign in to comment.