Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue percentage to libbeat metrics #39205

Merged
merged 14 commits into from Apr 30, 2024
21 changes: 18 additions & 3 deletions libbeat/publisher/pipeline/monitoring.go
Expand Up @@ -67,8 +67,9 @@ type metricsObserverVars struct {
activeEvents *monitoring.Uint

// queue metrics
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
percentQueueFull *monitoring.Float
}

func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
Expand All @@ -92,7 +93,8 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
queueACKed: monitoring.NewUint(reg, "queue.acked"),
queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"),

activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge
activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge
percentQueueFull: monitoring.NewFloat(reg, "queue.full.pct"),
},
}
}
Expand Down Expand Up @@ -121,12 +123,23 @@ func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() }
func (o *metricsObserver) newEvent() {
o.vars.events.Inc()
o.vars.activeEvents.Inc()
o.setPercentageFull()
}

// setPercentageFull is used interally to set the `queue.full` metric
func (o *metricsObserver) setPercentageFull() {
maxEvt := o.vars.queueMaxEvents.Get()
if maxEvt != 0 {
pct := float64(o.vars.activeEvents.Get()) / float64(maxEvt)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
o.vars.percentQueueFull.Set(pct)
}
}

// (client) event is filtered out (on purpose or failed)
func (o *metricsObserver) filteredEvent() {
o.vars.filtered.Inc()
o.vars.activeEvents.Dec()
o.setPercentageFull()
}

// (client) managed to push an event into the publisher pipeline
Expand All @@ -138,6 +151,7 @@ func (o *metricsObserver) publishedEvent() {
func (o *metricsObserver) failedPublishEvent() {
o.vars.failed.Inc()
o.vars.activeEvents.Dec()
o.setPercentageFull()
}

//
Expand All @@ -148,6 +162,7 @@ func (o *metricsObserver) failedPublishEvent() {
func (o *metricsObserver) queueACKed(n int) {
o.vars.queueACKed.Add(uint64(n))
o.vars.activeEvents.Sub(uint64(n))
o.setPercentageFull()
}

// (queue) maximum queue event capacity
Expand Down