Skip to content

Commit

Permalink
Merge pull request #391 from xmidt-org/denopink/feat/improve-outbound…
Browse files Browse the repository at this point in the history
…_dropped_messages-metric

feat: improve outbound_dropped_messages metric
  • Loading branch information
denopink committed Feb 27, 2024
2 parents 795d6ff + e5d8547 commit b6be11b
Show file tree
Hide file tree
Showing 10 changed files with 648 additions and 167 deletions.
24 changes: 12 additions & 12 deletions ackDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"time"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/webpa-common/v2/device"
"go.uber.org/zap"

Expand All @@ -26,10 +26,10 @@ type ackDispatcher struct {
hostname string
logger *zap.Logger
timeout time.Duration
AckSuccess metrics.Counter
AckFailure metrics.Counter
AckSuccessLatency metrics.Histogram
AckFailureLatency metrics.Histogram
AckSuccess CounterVec
AckFailure CounterVec
AckSuccessLatency HistogramVec
AckFailureLatency HistogramVec
}

// NewAckDispatcher is an ackDispatcher factory which processes outbound events
Expand Down Expand Up @@ -130,33 +130,33 @@ func (d *ackDispatcher) OnDeviceEvent(event *device.Event) {
p := dm.PartnerIDClaim()
t := m.Type.FriendlyName()
// Metric labels
ls := []string{qosLevelLabel, l.String(), partnerIDLabel, p, messageType, t}
ls := prometheus.Labels{qosLevelLabel: l.String(), partnerIDLabel: p, messageType: t}
ctx, cancel := context.WithTimeout(context.Background(), d.timeout)
defer cancel()

// Observe the latency of sending an ack to the source device
ackFailure := false
defer func(s time.Time) {
d.recordAckLatency(s, ackFailure, ls...)
d.recordAckLatency(s, ackFailure, ls)
}(time.Now())

if _, err := event.Device.Send(r.WithContext(ctx)); err != nil {
d.logger.Error("Error dispatching QOS ack", zap.Any("qosLevel", l), zap.Any("partnerID", p), zap.Any("messageType", t), zap.Error(err))
d.AckFailure.With(ls...).Add(1)
d.AckFailure.With(ls).Add(1)
ackFailure = true
return
}

d.AckSuccess.With(ls...).Add(1)
d.AckSuccess.With(ls).Add(1)
}

// recordAckLatency records the latency for both successful and failed acks
func (d *ackDispatcher) recordAckLatency(s time.Time, f bool, l ...string) {
func (d *ackDispatcher) recordAckLatency(s time.Time, f bool, l prometheus.Labels) {
switch {
case f:
d.AckFailureLatency.With(l...).Observe(time.Since(s).Seconds())
d.AckFailureLatency.With(l).Observe(time.Since(s).Seconds())

default:
d.AckSuccessLatency.With(l...).Observe(time.Since(s).Seconds())
d.AckSuccessLatency.With(l).Observe(time.Since(s).Seconds())
}
}
7 changes: 4 additions & 3 deletions ackDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,7 +47,7 @@ func testAckDispatcherOnDeviceEventQOSEventFailure(t *testing.T) {
mAckFailureLatency := new(mockHistogram)
p, mt, qosl := failure_case, failure_case, failure_case
// Setup labels for metrics
l := []string{qosLevelLabel, qosl, partnerIDLabel, p, messageType, mt}
l := prometheus.Labels{qosLevelLabel: qosl, partnerIDLabel: p, messageType: mt}
om := OutboundMeasures{
AckSuccess: mAckSuccess,
AckFailure: mAckFailure,
Expand Down Expand Up @@ -169,7 +170,7 @@ func testAckDispatcherOnDeviceEventQOSDeviceFailure(t *testing.T) {
require.True(ok)
// Setup labels for metrics
dm := genTestMetadata()
l := []string{qosLevelLabel, m.QualityOfService.Level().String(), partnerIDLabel, dm.PartnerIDClaim(), messageType, m.Type.FriendlyName()}
l := prometheus.Labels{qosLevelLabel: m.QualityOfService.Level().String(), partnerIDLabel: dm.PartnerIDClaim(), messageType: m.Type.FriendlyName()}
// Setup metrics for the dispatcher
om := OutboundMeasures{
AckSuccess: mAckSuccess,
Expand Down Expand Up @@ -493,7 +494,7 @@ func testAckDispatcherOnDeviceEventQOSSuccess(t *testing.T) {
}

// Setup labels for metrics
l := []string{qosLevelLabel, qosl, partnerIDLabel, p, messageType, mt}
l := prometheus.Labels{qosLevelLabel: qosl, partnerIDLabel: p, messageType: mt}
// Setup metrics for the dispatcher
om := OutboundMeasures{
AckSuccess: mAckSuccess,
Expand Down

0 comments on commit b6be11b

Please sign in to comment.