Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Refactor ingestor flags to be in ingestor package #1735

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 2 additions & 7 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,10 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose)
c := ingestor.Cfg{
c := ingestor.Parameters{
NumCopiers: numCopiers,
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
MetricsAsyncAcks: cfg.MetricsAsyncAcks,
TracesAsyncAcks: cfg.TracesAsyncAcks,
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
TracesBatchTimeout: cfg.TracesBatchTimeout,
TracesMaxBatchSize: cfg.TracesMaxBatchSize,
TracesBatchWorkers: cfg.TracesBatchWorkers,
Config: &cfg.IngestorFlags,
}

var (
Expand Down
23 changes: 8 additions & 15 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"github.com/timescale/promscale/pkg/limits"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/version"
)

// Config for the database.
type Config struct {
CacheConfig cache.Config
IngestorFlags ingestor.Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
IngestorFlags ingestor.Config
IngestorConfig ingestor.Config

For consistency Config is used to refer to almost every other configuration grouping, and we already have the CacheConfig above.

AppName string
Host string
Port int
Expand All @@ -32,9 +33,6 @@ type Config struct {
Database string
SslMode string
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
MetricsAsyncAcks bool
TracesAsyncAcks bool
WriteConnections int
WriterPoolSize int
WriterSynchronousCommit bool
Expand All @@ -44,9 +42,6 @@ type Config struct {
UsesHA bool
DbUri string
EnableStatementsCache bool
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

const (
Expand Down Expand Up @@ -75,6 +70,7 @@ var (
// ParseFlags parses the configuration flags specific to PostgreSQL and TimescaleDB
func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
cache.ParseFlags(fs, &cfg.CacheConfig)
ingestor.ParseFlags(fs, &cfg.IngestorFlags)

fs.StringVar(&cfg.AppName, "db.app", DefaultApp, "This sets the application_name in database connection string. "+
"This is helpful during debugging when looking at pg_stat_activity.")
Expand All @@ -85,9 +81,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.StringVar(&cfg.Database, "db.name", defaultDBName, "Database name.")
fs.StringVar(&cfg.SslMode, "db.ssl-mode", defaultSSLMode, "TimescaleDB connection ssl mode. If you do not want to use ssl, pass 'allow' as value.")
fs.DurationVar(&cfg.DbConnectionTimeout, "db.connection-timeout", defaultConnectionTime, "Timeout for establishing the connection between Promscale and TimescaleDB.")
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")

fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics/traces to database. "+
"By default, this will be set based on the number of CPUs available to the DB Promscale is connected to.")
fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+
Expand All @@ -102,18 +96,17 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
"Disable if using PgBouncer")
fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", true, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")

return cfg
}

func Validate(cfg *Config, lcfg limits.Config) error {
if err := cfg.validateConnectionSettings(); err != nil {
return err
}
if err := ingestor.Validate(&cfg.IngestorFlags); err != nil {
return err
}
return cache.Validate(&cfg.CacheConfig, lcfg)
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type pgxDispatcher struct {

var _ model.Dispatcher = &pgxDispatcher{}

func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) {
numCopiers := cfg.NumCopiers
func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, params *Parameters) (*pgxDispatcher, error) {
numCopiers := params.NumCopiers
if numCopiers < 1 {
log.Warn("msg", "num copiers less than 1, setting to 1")
numCopiers = 1
Expand All @@ -66,7 +66,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh)))
metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) })

if cfg.IgnoreCompressedChunks {
if params.Config.IgnoreCompressedChunks {
// Handle decompression to not decompress anything.
handleDecompression = skipDecompression
}
Expand All @@ -76,7 +76,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
}

labelArrayOID := model.GetCustomTypeOID(model.LabelArray)
labelsCache, err := cache.NewInvertedLabelsCache(cfg.InvertedLabelsCacheSize)
labelsCache, err := cache.NewInvertedLabelsCache(params.InvertedLabelsCacheSize)
if err != nil {
return nil, err
}
Expand All @@ -94,7 +94,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
invertedLabelsCache: labelsCache,
exemplarKeyPosCache: eCache,
completeMetricCreation: make(chan struct{}, 1),
asyncAcks: cfg.MetricsAsyncAcks,
asyncAcks: params.Config.MetricsAsyncAcks,
copierReadRequestCh: copierReadRequestCh,
// set to run at half our deletion interval
seriesEpochRefresh: time.NewTicker(30 * time.Minute),
Expand All @@ -112,7 +112,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac

go inserter.runCompleteMetricCreationWorker()

if !cfg.DisableEpochSync {
if !params.Config.DisableEpochSync {
inserter.doneWG.Add(1)
go func() {
defer inserter.doneWG.Done()
Expand Down
37 changes: 37 additions & 0 deletions pkg/pgmodel/ingestor/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package ingestor

import (
"flag"
"time"

"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
)

type Config struct {
MetricsAsyncAcks bool
TracesAsyncAcks bool
DisableEpochSync bool
IgnoreCompressedChunks bool
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

func ParseFlags(fs *flag.FlagSet, cfg *Config) {
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")
fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", true, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
}

func Validate(cfg *Config) error {
return nil
}
42 changes: 20 additions & 22 deletions pkg/pgmodel/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@ import (
"github.com/timescale/promscale/pkg/tracer"
)

type Cfg struct {
MetricsAsyncAcks bool
TracesAsyncAcks bool
NumCopiers int
DisableEpochSync bool
IgnoreCompressedChunks bool
InvertedLabelsCacheSize uint64
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

// DBIngestor ingest the TimeSeries data into Timescale database.
type DBIngestor struct {
sCache cache.SeriesCache
Expand All @@ -43,33 +31,42 @@ type DBIngestor struct {
closed *atomic.Bool
}

type Parameters struct {
NumCopiers int
InvertedLabelsCacheSize uint64
Config *Config
}

// NewPgxIngestor returns a new Ingestor that uses connection pool and a metrics cache
// for caching metric table names.
func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*DBIngestor, error) {
dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, cfg)
func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, params *Parameters) (*DBIngestor, error) {
dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, params)
if err != nil {
return nil, err
}

batcherConfg := trace.BatcherConfig{
MaxBatchSize: cfg.TracesMaxBatchSize,
BatchTimeout: cfg.TracesBatchTimeout,
Writers: cfg.NumCopiers,
MaxBatchSize: params.Config.TracesMaxBatchSize,
BatchTimeout: params.Config.TracesBatchTimeout,
Writers: params.NumCopiers,
}
traceWriter := trace.NewWriter(conn)
return &DBIngestor{
sCache: sCache,
dispatcher: dispatcher,
tWriter: trace.NewDispatcher(traceWriter, cfg.TracesAsyncAcks, batcherConfg),
tWriter: trace.NewDispatcher(traceWriter, params.Config.TracesAsyncAcks, batcherConfg),
closed: atomic.NewBool(false),
}, nil
}

// NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX
// with an empty config, a new default size metrics cache and a non-ha-aware data parser
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) {
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Parameters) (*DBIngestor, error) {
if cfg == nil {
cfg = &Cfg{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2}
cfg = &Parameters{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2}
}
if cfg.Config == nil {
cfg.Config = &Config{}
}
cacheConfig := cache.DefaultConfig
c := cache.NewMetricCache(cacheConfig)
Expand Down Expand Up @@ -101,8 +98,9 @@ func (ingestor *DBIngestor) IngestTraces(ctx context.Context, traces ptrace.Trac

// IngestMetrics transforms and ingests the timeseries data into Timescale database.
// input:
// req the WriteRequest backing tts. It will be added to our WriteRequest
// pool when it is no longer needed.
//
// req the WriteRequest backing tts. It will be added to our WriteRequest
// pool when it is no longer needed.
func (ingestor *DBIngestor) IngestMetrics(ctx context.Context, r *prompb.WriteRequest) (numInsertablesIngested uint64, numMetadataIngested uint64, err error) {
if ingestor.closed.Load() {
return 0, 0, fmt.Errorf("ingestor is closed and can't ingest metrics")
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/ingestor_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func TestPGXInserterInsertData(t *testing.T) {
if err != nil {
t.Fatalf("error setting up mock cache: %s", err.Error())
}
inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2})
inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Parameters{Config: &Config{DisableEpochSync: true}, InvertedLabelsCacheSize: 10, NumCopiers: 2})
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ func TestSQLDropMetricChunk(t *testing.T) {
}

c := cache.NewMetricCache(cache.DefaultConfig)
ingestor, err := ingstr.NewPgxIngestor(pgxconn.NewPgxConn(db), c, scache, nil, &ingstr.Cfg{
DisableEpochSync: true,
ingestor, err := ingstr.NewPgxIngestor(pgxconn.NewPgxConn(db), c, scache, nil, &ingstr.Parameters{
Config: &ingstr.Config{DisableEpochSync: true},
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: 2,
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/ha_single_promscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func prepareWriterWithHa(db *pgxpool.Pool, t testing.TB) (*util.ManualTicker, ht
dataParser.AddPreprocessor(ha.NewFilter(haService))
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}

ing, err := ingestor.NewPgxIngestor(pgxconn.NewPgxConn(db), mCache, sCache, nil, &ingestor.Cfg{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2,
ing, err := ingestor.NewPgxIngestor(pgxconn.NewPgxConn(db), mCache, sCache, nil, &ingestor.Parameters{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2, Config: &ingestor.Config{},
})
if err != nil {
t.Fatalf("could not create ingestor: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestInsertInCompressedChunks(t *testing.T) {

// With decompress chunks being false.
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Cfg{
IgnoreCompressedChunks: true,
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Parameters{
Config: &ingstr.Config{IgnoreCompressedChunks: true},
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: 2,
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/jaeger_store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func TestJaegerStorageIntegration(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
withDB(t, "jaeger_storage_integration_tests", func(db *pgxpool.Pool, t testing.TB) {
cfg := &ingestor.Cfg{
cfg := &ingestor.Parameters{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: runtime.NumCPU() / 2,
TracesAsyncAcks: true, // To make GetLargeSpans happy, otherwise it takes quite a few time to ingest.
Config: &ingestor.Config{TracesAsyncAcks: true}, // To make GetLargeSpans happy, otherwise it takes quite a few time to ingest.
}
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), cfg)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/metric_ingest_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func BenchmarkMetricIngest(b *testing.B) {

withDB(b, "bench_e2e_metric_ingest", func(db *pgxpool.Pool, t testing.TB) {
b.StopTimer()
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Cfg{
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Parameters{
NumCopiers: 8,
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
})
Expand Down Expand Up @@ -119,7 +119,7 @@ func BenchmarkNewSeriesIngestion(b *testing.B) {

for i := 0; i < b.N; i++ {
withDB(b, "bench_e2e_new_series_ingest", func(db *pgxpool.Pool, t testing.TB) {
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Cfg{
metricsIngestor, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingestor.Parameters{
NumCopiers: 8,
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/trace_ingest_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func BenchmarkTracesIngest(b *testing.B) {
for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
withDB(b, "trace_ingest_bench", func(db *pgxpool.Pool, t testing.TB) {
cfg := &ingestor.Cfg{
cfg := &ingestor.Parameters{
InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize,
NumCopiers: runtime.NumCPU() / 2,
TracesAsyncAcks: c.async,
Config: &ingestor.Config{TracesAsyncAcks: c.async},
}
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), cfg)
require.NoError(t, err)
Expand Down