Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Refactor ingestor flags to be in ingestor package
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Nov 3, 2022
1 parent c7738f1 commit d12f1ab
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 59 deletions.
9 changes: 2 additions & 7 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,10 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose)
c := ingestor.Cfg{
c := ingestor.Config{
NumCopiers: numCopiers,
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
MetricsAsyncAcks: cfg.MetricsAsyncAcks,
TracesAsyncAcks: cfg.TracesAsyncAcks,
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
TracesBatchTimeout: cfg.TracesBatchTimeout,
TracesMaxBatchSize: cfg.TracesMaxBatchSize,
TracesBatchWorkers: cfg.TracesBatchWorkers,
Flags: &cfg.IngestorFlags,
}

var (
Expand Down
23 changes: 8 additions & 15 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"github.com/timescale/promscale/pkg/limits"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/version"
)

// Config for the database.
type Config struct {
CacheConfig cache.Config
IngestorFlags ingestor.Flags
AppName string
Host string
Port int
Expand All @@ -32,9 +33,6 @@ type Config struct {
Database string
SslMode string
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
MetricsAsyncAcks bool
TracesAsyncAcks bool
WriteConnections int
WriterPoolSize int
WriterSynchronousCommit bool
Expand All @@ -44,9 +42,6 @@ type Config struct {
UsesHA bool
DbUri string
EnableStatementsCache bool
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

const (
Expand Down Expand Up @@ -75,6 +70,7 @@ var (
// ParseFlags parses the configuration flags specific to PostgreSQL and TimescaleDB
func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
cache.ParseFlags(fs, &cfg.CacheConfig)
ingestor.ParseFlags(fs, &cfg.IngestorFlags)

fs.StringVar(&cfg.AppName, "db.app", DefaultApp, "This sets the application_name in database connection string. "+
"This is helpful during debugging when looking at pg_stat_activity.")
Expand All @@ -85,9 +81,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.StringVar(&cfg.Database, "db.name", defaultDBName, "Database name.")
fs.StringVar(&cfg.SslMode, "db.ssl-mode", defaultSSLMode, "TimescaleDB connection ssl mode. If you do not want to use ssl, pass 'allow' as value.")
fs.DurationVar(&cfg.DbConnectionTimeout, "db.connection-timeout", defaultConnectionTime, "Timeout for establishing the connection between Promscale and TimescaleDB.")
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")

fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics/traces to database. "+
"By default, this will be set based on the number of CPUs available to the DB Promscale is connected to.")
fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+
Expand All @@ -102,18 +96,17 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
"Disable if using PgBouncer")
fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", true, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")

return cfg
}

func Validate(cfg *Config, lcfg limits.Config) error {
if err := cfg.validateConnectionSettings(); err != nil {
return err
}
if err := ingestor.Validate(&cfg.IngestorFlags); err != nil {
return err
}
return cache.Validate(&cfg.CacheConfig, lcfg)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type pgxDispatcher struct {

var _ model.Dispatcher = &pgxDispatcher{}

func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) {
func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Config) (*pgxDispatcher, error) {
numCopiers := cfg.NumCopiers
if numCopiers < 1 {
log.Warn("msg", "num copiers less than 1, setting to 1")
Expand All @@ -66,7 +66,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh)))
metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) })

if cfg.IgnoreCompressedChunks {
if cfg.Flags.IgnoreCompressedChunks {
// Handle decompression to not decompress anything.
handleDecompression = skipDecompression
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
invertedLabelsCache: labelsCache,
exemplarKeyPosCache: eCache,
completeMetricCreation: make(chan struct{}, 1),
asyncAcks: cfg.MetricsAsyncAcks,
asyncAcks: cfg.Flags.MetricsAsyncAcks,
copierReadRequestCh: copierReadRequestCh,
// set to run at half our deletion interval
seriesEpochRefresh: time.NewTicker(30 * time.Minute),
Expand All @@ -112,7 +112,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac

go inserter.runCompleteMetricCreationWorker()

if !cfg.DisableEpochSync {
if !cfg.Flags.DisableEpochSync {
inserter.doneWG.Add(1)
go func() {
defer inserter.doneWG.Done()
Expand Down
37 changes: 37 additions & 0 deletions pkg/pgmodel/ingestor/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package ingestor

import (
"flag"
"time"

"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
)

type Flags struct {
MetricsAsyncAcks bool
TracesAsyncAcks bool
DisableEpochSync bool
IgnoreCompressedChunks bool
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

func ParseFlags(fs *flag.FlagSet, cfg *Flags) {
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")
fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", true, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
}

func Validate(cfg *Flags) error {
return nil
}
38 changes: 18 additions & 20 deletions pkg/pgmodel/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@ import (
"github.com/timescale/promscale/pkg/tracer"
)

type Cfg struct {
MetricsAsyncAcks bool
TracesAsyncAcks bool
NumCopiers int
DisableEpochSync bool
IgnoreCompressedChunks bool
InvertedLabelsCacheSize uint64
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

// DBIngestor ingest the TimeSeries data into Timescale database.
type DBIngestor struct {
sCache cache.SeriesCache
Expand All @@ -43,33 +31,42 @@ type DBIngestor struct {
closed *atomic.Bool
}

type Config struct {
NumCopiers int
InvertedLabelsCacheSize uint64
Flags *Flags
}

// NewPgxIngestor returns a new Ingestor that uses connection pool and a metrics cache
// for caching metric table names.
func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*DBIngestor, error) {
func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Config) (*DBIngestor, error) {
dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, cfg)
if err != nil {
return nil, err
}

batcherConfg := trace.BatcherConfig{
MaxBatchSize: cfg.TracesMaxBatchSize,
BatchTimeout: cfg.TracesBatchTimeout,
MaxBatchSize: cfg.Flags.TracesMaxBatchSize,
BatchTimeout: cfg.Flags.TracesBatchTimeout,
Writers: cfg.NumCopiers,
}
traceWriter := trace.NewWriter(conn)
return &DBIngestor{
sCache: sCache,
dispatcher: dispatcher,
tWriter: trace.NewDispatcher(traceWriter, cfg.TracesAsyncAcks, batcherConfg),
tWriter: trace.NewDispatcher(traceWriter, cfg.Flags.TracesAsyncAcks, batcherConfg),
closed: atomic.NewBool(false),
}, nil
}

// NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX
// with an empty config, a new default size metrics cache and a non-ha-aware data parser
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) {
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Config) (*DBIngestor, error) {
if cfg == nil {
cfg = &Cfg{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2}
cfg = &Config{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2}
}
if cfg.Flags == nil {
cfg.Flags = &Flags{}
}
cacheConfig := cache.DefaultConfig
c := cache.NewMetricCache(cacheConfig)
Expand Down Expand Up @@ -101,8 +98,9 @@ func (ingestor *DBIngestor) IngestTraces(ctx context.Context, traces ptrace.Trac

// IngestMetrics transforms and ingests the timeseries data into Timescale database.
// input:
// req the WriteRequest backing tts. It will be added to our WriteRequest
// pool when it is no longer needed.
//
// req the WriteRequest backing tts. It will be added to our WriteRequest
// pool when it is no longer needed.
func (ingestor *DBIngestor) IngestMetrics(ctx context.Context, r *prompb.WriteRequest) (numInsertablesIngested uint64, numMetadataIngested uint64, err error) {
if ingestor.closed.Load() {
return 0, 0, fmt.Errorf("ingestor is closed and can't ingest metrics")
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/ingestor_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func TestPGXInserterInsertData(t *testing.T) {
if err != nil {
t.Fatalf("error setting up mock cache: %s", err.Error())
}
inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2})
inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Config{Flags: &Flags{DisableEpochSync: true}, InvertedLabelsCacheSize: 10, NumCopiers: 2})
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ func TestSQLDropMetricChunk(t *testing.T) {
}

c := cache.NewMetricCache(cache.DefaultConfig)
ingestor, err := ingstr.NewPgxIngestor(pgxconn.NewPgxConn(db), c, scache, nil, &ingstr.Cfg{
DisableEpochSync: true,
ingestor, err := ingstr.NewPgxIngestor(pgxconn.NewPgxConn(db), c, scache, nil, &ingstr.Config{
Flags: &ingstr.Flags{DisableEpochSync: true},
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: 2,
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/ha_single_promscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func prepareWriterWithHa(db *pgxpool.Pool, t testing.TB) (*util.ManualTicker, ht
dataParser.AddPreprocessor(ha.NewFilter(haService))
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}

ing, err := ingestor.NewPgxIngestor(pgxconn.NewPgxConn(db), mCache, sCache, nil, &ingestor.Cfg{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2,
ing, err := ingestor.NewPgxIngestor(pgxconn.NewPgxConn(db), mCache, sCache, nil, &ingestor.Config{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2, Flags: &ingestor.Flags{},
})
if err != nil {
t.Fatalf("could not create ingestor: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestInsertInCompressedChunks(t *testing.T) {

// With decompress chunks being false.
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Cfg{
IgnoreCompressedChunks: true,
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Config{
Flags: &ingstr.Flags{IgnoreCompressedChunks: true},
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: 2,
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/jaeger_store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func TestJaegerStorageIntegration(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
withDB(t, "jaeger_storage_integration_tests", func(db *pgxpool.Pool, t testing.TB) {
cfg := &ingestor.Cfg{
cfg := &ingestor.Config{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: runtime.NumCPU() / 2,
TracesAsyncAcks: true, // To make GetLargeSpans happy, otherwise it takes quite a few time to ingest.
Flags: &ingestor.Flags{TracesAsyncAcks: true}, // To make GetLargeSpans happy, otherwise it takes quite a few time to ingest.
}
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), cfg)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/metric_ingest_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func BenchmarkMetricIngest(b *testing.B) {

withDB(b, "bench_e2e_metric_ingest", func(db *pgxpool.Pool, t testing.TB) {
b.StopTimer()
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Cfg{
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Config{
NumCopiers: 8,
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
})
Expand Down Expand Up @@ -119,7 +119,7 @@ func BenchmarkNewSeriesIngestion(b *testing.B) {

for i := 0; i < b.N; i++ {
withDB(b, "bench_e2e_new_series_ingest", func(db *pgxpool.Pool, t testing.TB) {
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Cfg{
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Config{
NumCopiers: 8,
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/trace_ingest_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func BenchmarkTracesIngest(b *testing.B) {
for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
withDB(b, "trace_ingest_bench", func(db *pgxpool.Pool, t testing.TB) {
cfg := &ingestor.Cfg{
cfg := &ingestor.Config{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: runtime.NumCPU() / 2,
TracesAsyncAcks: c.async,
Flags: &ingestor.Flags{TracesAsyncAcks: c.async},
}
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), cfg)
require.NoError(t, err)
Expand Down

0 comments on commit d12f1ab

Please sign in to comment.