Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue percentage to libbeat metrics #39205

Merged
merged 14 commits into from Apr 30, 2024
66 changes: 34 additions & 32 deletions libbeat/monitoring/report/log/log.go
Expand Up @@ -37,34 +37,36 @@ import (
// TODO: Replace this with a proper solution that uses the metric type from
// where it is defined. See: https://github.com/elastic/beats/issues/5433
var gauges = map[string]bool{
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.events.full.pct": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
}

// isGauge returns true when the given metric key name represents a gauge value.
Expand Down Expand Up @@ -249,16 +251,16 @@ func toKeyValuePairs(snaps map[string]monitoring.FlatSnapshot) []interface{} {
for name, snap := range snaps {
data := make(mapstr.M, snapshotLen(snap))
for k, v := range snap.Bools {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
for k, v := range snap.Floats {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
for k, v := range snap.Ints {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
for k, v := range snap.Strings {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
if len(data) > 0 {
args = append(args, logp.Reflect(name, data))
Expand Down
21 changes: 18 additions & 3 deletions libbeat/publisher/pipeline/monitoring.go
Expand Up @@ -67,8 +67,9 @@ type metricsObserverVars struct {
activeEvents *monitoring.Uint

// queue metrics
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
percentQueueFull *monitoring.Float
}

func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
Expand All @@ -92,7 +93,8 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
queueACKed: monitoring.NewUint(reg, "queue.acked"),
queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"),

activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge
activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge
percentQueueFull: monitoring.NewFloat(reg, "queue.events.full.pct"),
},
}
}
Expand Down Expand Up @@ -121,12 +123,23 @@ func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() }
func (o *metricsObserver) newEvent() {
o.vars.events.Inc()
o.vars.activeEvents.Inc()
o.setPercentageFull()
}

// setPercentageFull is used interally to set the `queue.full` metric
func (o *metricsObserver) setPercentageFull() {
maxEvt := o.vars.queueMaxEvents.Get()
if maxEvt != 0 {
pct := float64(o.vars.activeEvents.Get()) / float64(maxEvt)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
o.vars.percentQueueFull.Set(pct)
}
}

// (client) event is filtered out (on purpose or failed)
func (o *metricsObserver) filteredEvent() {
o.vars.filtered.Inc()
o.vars.activeEvents.Dec()
o.setPercentageFull()
}

// (client) managed to push an event into the publisher pipeline
Expand All @@ -138,6 +151,7 @@ func (o *metricsObserver) publishedEvent() {
func (o *metricsObserver) failedPublishEvent() {
o.vars.failed.Inc()
o.vars.activeEvents.Dec()
o.setPercentageFull()
}

//
Expand All @@ -148,6 +162,7 @@ func (o *metricsObserver) failedPublishEvent() {
func (o *metricsObserver) queueACKed(n int) {
o.vars.queueACKed.Add(uint64(n))
o.vars.activeEvents.Sub(uint64(n))
o.setPercentageFull()
}

// (queue) maximum queue event capacity
Expand Down