Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Add telemetry for metrics_ingestion_path & json_endpoint_used #1695

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ We use the following categories for changes:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## Unreleased

### Added
- Telemetry for `promscale_metrics_ingestion_path` and `promscale_metrics_ingestion_json_endpoint_used` [#1695]

## [0.15.0] - 2022-10-11

### Added
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/timescale/promscale/pkg/api/parser/protobuf"
"github.com/timescale/promscale/pkg/api/parser/text"
"github.com/timescale/promscale/pkg/prompb"
"github.com/timescale/promscale/pkg/telemetry"
)

type formatParser func(*http.Request, *prompb.WriteRequest) error
Expand Down Expand Up @@ -62,6 +63,8 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e
return fmt.Errorf("parser error: %w", err)
}

updateTelemetry(mediaType)

if len(req.Timeseries) == 0 {
return nil
}
Expand All @@ -81,3 +84,24 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e

return nil
}

func InitTelemetry() {
telemetry.Registry.Update("metrics_ingestion_path", "no_ingestion")
telemetry.Registry.Update("metrics_ingestion_json_endpoint_used", "0")
}

func updateTelemetry(parser string) {
switch parser {
case "application/x-protobuf":
telemetry.Registry.Update("metrics_ingestion_path", "protobuf")
case "application/json":
telemetry.Registry.Update("metrics_ingestion_path", "json")
telemetry.Registry.Update("metrics_ingestion_json_endpoint_used", "1")
case "text/plain":
telemetry.Registry.Update("metrics_ingestion_path", "text_plain")
case "application/openmetrics-text":
telemetry.Registry.Update("metrics_ingestion_path", "text_open_metrics")
default:
telemetry.Registry.Update("metrics_ingestion_path", "none")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unknown seems better

}
}
45 changes: 7 additions & 38 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,8 @@ import (
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/rules/adapters"
"github.com/timescale/promscale/pkg/telemetry"
"github.com/timescale/promscale/pkg/util"
)

var (
// These metrics are used to track telemetry by registering
// in telemetryEngine.RegisterDynamicMetadata()
rulesEnabled = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "rules",
Name: "enabled",
Help: "Promscale rules is enabled or not.",
},
)
alertingEnabled = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "alerting",
Name: "enabled",
Help: "Promscale alerting is enabled or not.",
},
)
)

func init() {
prometheus.MustRegister(rulesEnabled, alertingEnabled)
}

type Manager struct {
ctx context.Context
rulesManager *prom_rules.Manager
Expand Down Expand Up @@ -98,14 +72,9 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
return manager, manager.getReloader(cfg), nil
}

func RegisterForTelemetry(t telemetry.Engine) error {
if err := t.RegisterDynamicMetadata("rules_enabled", rulesEnabled); err != nil {
return fmt.Errorf("register dynamic 'promscale_rules_enabled' metric for telemetry: %w", err)
}
if err := t.RegisterDynamicMetadata("alerting_enabled", alertingEnabled); err != nil {
return fmt.Errorf("register dynamic 'promscale_alerting_enabled' metric for telemetry: %w", err)
}
return nil
func InitTelemetry() {
telemetry.Registry.Update("rules_enabled", "0")
telemetry.Registry.Update("alerting_enabled", "0")
}

func (m *Manager) getReloader(cfg *Config) func() error {
Expand All @@ -124,17 +93,17 @@ func (m *Manager) getReloader(cfg *Config) func() error {

func (m *Manager) updateTelemetry(cfg *Config) {
if cfg.ContainsRules() {
rulesEnabled.Set(1)
telemetry.Registry.Update("rules_enabled", "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how I feel about these hardcoded strings in various places

if cfg.ContainsAlertingConfig() {
alertingEnabled.Set(1)
telemetry.Registry.Update("alerting_enabled", "1")
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
alertingEnabled.Set(0)
telemetry.Registry.Update("alerting_enabled", "0")
}
return
}
log.Debug("msg", "Rules files not found. Rules and alerting configuration will not be initialized")
rulesEnabled.Set(0)
telemetry.Registry.Update("rules_enabled", "0")
}

func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/timescale/promscale/pkg/api"
"github.com/timescale/promscale/pkg/api/parser"
jaegerStore "github.com/timescale/promscale/pkg/jaeger/store"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
Expand Down Expand Up @@ -368,8 +369,7 @@ func initTelemetryEngine(client *pgclient.Client) (telemetry.Engine, error) {
if err := trace.RegisterTelemetryMetrics(t); err != nil {
log.Error("msg", "error registering metrics for Jaeger-ingest telemetry", "err", err.Error())
}
if err := rules.RegisterForTelemetry(t); err != nil {
log.Error("msg", "error registering metrics for rules telemetry", "err", err.Error())
}
parser.InitTelemetry()
rules.InitTelemetry()
return t, nil
}
27 changes: 27 additions & 0 deletions pkg/telemetry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package telemetry

import "sync"

type reg struct {
r sync.Map
}

func (r *reg) Update(telemetryName, value string) {
r.r.Store(telemetryName, value)
}

func (r *reg) metadata() (m Metadata) {
m = Metadata{}
r.r.Range(func(telemetryName, value interface{}) bool {
m[telemetryName.(string)] = value.(string)
return true
})
return m
}

// Registry is a telemetry holder that is mutable and can be filled from anywhere in Promscale.
var Registry reg
71 changes: 9 additions & 62 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ var ErrInvalidMetric = fmt.Errorf("metric not a counter or gauge")
// the _timescaledb_catalog.metadata table.
type Engine interface {
// RegisterMetric registers a Prometheus metric with a column name. This metric is
// monitored every telemetrySync and updated in the telemetry table.
// monitored every telemetrySync and updated in the promscale_instance_information table.
RegisterMetric(columnName string, gaugeOrCounterMetric ...prometheus.Metric) error

// RegisterDynamicMetadata is a Prometheus metric that changes regularly. This is monitored
// every telemetrySync and updated in the telemetry table.
RegisterDynamicMetadata(columnName string, gauge prometheus.Metric) error
Start()
Stop()
}
Expand All @@ -50,9 +46,7 @@ type engineImpl struct {

promqlEngine *promql.Engine
promqlQueryable promql.Queryable

metrics sync.Map
dynamicMetadata sync.Map
}

func NewEngine(conn pgxconn.PgxConn, uuid [16]byte, promqlQueryable promql.Queryable) (Engine, error) {
Expand Down Expand Up @@ -120,56 +114,19 @@ func (t *engineImpl) writeToTimescaleMetadataTable(m Metadata) {
return
}
// Try to update via Promscale extension.
if err := t.syncWithMetadataTable(metadataUpdateWithExtension, m); err != nil {
if err := syncWithMetadataTable(t.conn, metadataUpdateWithExtension, m); err != nil {
// Promscale extension not installed. Try to attempt to write directly as a rare attempt
// in case we fix the _timescaledb_catalog.metadata permissions in the future.
_ = t.syncWithMetadataTable(metadataUpdateNoExtension, m)
}
}

func (t *engineImpl) RegisterDynamicMetadata(telemetryName string, gauge prometheus.Metric) error {
if !isGauge(gauge) {
return ErrInvalidMetric
_ = syncWithMetadataTable(t.conn, metadataUpdateNoExtension, m)
}
t.dynamicMetadata.Store(telemetryName, gauge)
return nil
}

func (t *engineImpl) syncDynamicMetadata() error {
var (
err error
val float64
metadata = Metadata{}
)
t.dynamicMetadata.Range(func(key, value interface{}) bool {
columnName := key.(string)
metric := value.(prometheus.Metric)
val, err = util.ExtractMetricValue(metric)
if err != nil {
err = fmt.Errorf("extracting metric value of stat '%s': %w", columnName, err)
return false
}
var state string
switch val {
case 0:
state = "false"
case 1:
state = "true"
default:
err = fmt.Errorf("invalid state value '%f' for stat '%s'", val, columnName)
}
metadata[columnName] = state
return true
})
if err != nil {
return err
}
t.writeToTimescaleMetadataTable(metadata)
return nil
func (t *engineImpl) syncRegistry() {
t.writeToTimescaleMetadataTable(Registry.metadata())
}

func (t *engineImpl) syncWithMetadataTable(queryFormat string, m Metadata) error {
batch := t.conn.NewBatch()
func syncWithMetadataTable(conn pgxconn.PgxConn, queryFormat string, m Metadata) error {
batch := conn.NewBatch()
for key, metadata := range m {
safe := pgutf8str.Text{}
if err := safe.Set(metadata); err != nil {
Expand All @@ -179,7 +136,7 @@ func (t *engineImpl) syncWithMetadataTable(queryFormat string, m Metadata) error
batch.Queue(query, key, safe, true)
}

results, err := t.conn.SendBatch(context.Background(), batch)
results, err := conn.SendBatch(context.Background(), batch)
if err != nil {
return fmt.Errorf("error sending batch: %w", err)
}
Expand Down Expand Up @@ -209,9 +166,7 @@ func (t *engineImpl) Sync() error {
if err := t.syncWithInfoTable(); err != nil {
return fmt.Errorf("sync info table: %w", err)
}
if err := t.syncDynamicMetadata(); err != nil {
return fmt.Errorf("sync dynamic metadata: %w", err)
}
t.syncRegistry()
t.housekeeping()
return nil
}
Expand Down Expand Up @@ -300,14 +255,6 @@ func isCounterOrGauge(metric prometheus.Metric) bool {
}
}

func isGauge(metric prometheus.Metric) bool {
switch metric.(type) {
case prometheus.Gauge:
return true
}
return false
}

// syncInfoTable stats with promscale_instance_information table.
func (t *engineImpl) syncInfoTable(stats map[string]float64) error {
pgUUID := new(pgtype.UUID)
Expand Down