Skip to content

Commit

Permalink
Add metrics labels (#658)
Browse files Browse the repository at this point in the history
* Add labels to metrics
* upgrade memberlist to 0.4.0

Co-authored-by: R.B. Boyer <rb@hashicorp.com>
  • Loading branch information
huikang and rboyer committed Aug 8, 2022
1 parent b3a2384 commit 3805ead
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 27 deletions.
2 changes: 1 addition & 1 deletion coordinate/client.go
Expand Up @@ -218,7 +218,7 @@ func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coo
return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT)
}
if rtt == 0 {
metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "coordinate", "zero-rtt"}, 1, c.config.MetricLabels)
}

rttSeconds := c.latencyFilter(node, rtt.Seconds())
Expand Down
7 changes: 7 additions & 0 deletions coordinate/config.go
@@ -1,5 +1,9 @@
package coordinate

import (
"github.com/armon/go-metrics"
)

// Config is used to set the parameters of the Vivaldi-based coordinate mapping
// algorithm.
//
Expand Down Expand Up @@ -52,6 +56,9 @@ type Config struct {
// GravityRho is a tuning factor that sets how much gravity has an effect
// to try to re-center coordinates. See [2] for more details.
GravityRho float64

// metricLabels is the slice of labels to put on all emitted metrics
MetricLabels []metrics.Label
}

// DefaultConfig returns a Config that has some default values suitable for
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -13,7 +13,7 @@ require (
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/hashicorp/logutils v1.0.0
github.com/hashicorp/mdns v1.0.4
github.com/hashicorp/memberlist v0.3.0
github.com/hashicorp/memberlist v0.4.0
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mitchellh/cli v1.1.0
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Expand Up @@ -37,8 +37,8 @@ github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ=
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA=
github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/memberlist v0.4.0 h1:k3uda5gZcltmafuFF+UFqNEl5PrH+yPZ4zkjp1f/H/8=
github.com/hashicorp/memberlist v0.4.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE=
Expand Down Expand Up @@ -91,8 +91,9 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
4 changes: 4 additions & 0 deletions serf/config.go
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/memberlist"
)

Expand Down Expand Up @@ -262,6 +263,9 @@ type Config struct {
// contain alphanumeric, dashes and '.'characters
// and sets maximum length to 128 characters
ValidateNodeNames bool

// MetricLabels is a map of optional labels to apply to all metrics emitted.
MetricLabels []metrics.Label
}

// Init allocates the subdata structures
Expand Down
8 changes: 4 additions & 4 deletions serf/delegate.go
Expand Up @@ -30,7 +30,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
if len(buf) == 0 {
return
}
metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))
metrics.AddSampleWithLabels([]string{"serf", "msgs", "received"}, float32(len(buf)), d.serf.metricLabels)

rebroadcast := false
rebroadcastQueue := d.serf.broadcasts
Expand Down Expand Up @@ -142,7 +142,7 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
for _, msg := range msgs {
lm := len(msg)
bytesUsed += lm + overhead
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
metrics.AddSampleWithLabels([]string{"serf", "msgs", "sent"}, float32(lm), d.serf.metricLabels)
}

// Get any additional query broadcasts
Expand All @@ -151,7 +151,7 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
for _, m := range queryMsgs {
lm := len(m)
bytesUsed += lm + overhead
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
metrics.AddSampleWithLabels([]string{"serf", "msgs", "sent"}, float32(lm), d.serf.metricLabels)
}
msgs = append(msgs, queryMsgs...)
}
Expand All @@ -162,7 +162,7 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
for _, m := range eventMsgs {
lm := len(m)
bytesUsed += lm + overhead
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
metrics.AddSampleWithLabels([]string{"serf", "msgs", "sent"}, float32(lm), d.serf.keyManager.serf.metricLabels)
}
msgs = append(msgs, eventMsgs...)
}
Expand Down
4 changes: 2 additions & 2 deletions serf/ping_delegate.go
Expand Up @@ -68,7 +68,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
before := p.serf.coordClient.GetCoordinate()
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
if err != nil {
metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "coordinate", "rejected"}, 1, p.serf.metricLabels)
p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n",
other.Name, err)
return
Expand All @@ -77,7 +77,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
metrics.AddSampleWithLabels([]string{"serf", "coordinate", "adjustment-ms"}, d, p.serf.metricLabels)

// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
Expand Down
37 changes: 23 additions & 14 deletions serf/serf.go
Expand Up @@ -105,6 +105,9 @@ type Serf struct {
coordClient *coordinate.Client
coordCache map[string]*coordinate.Coordinate
coordCacheLock sync.RWMutex

// metricLabels is the slice of labels to put on all emitted metrics
metricLabels []metrics.Label
}

// SerfState is the state of the Serf instance.
Expand Down Expand Up @@ -270,6 +273,7 @@ func Create(conf *Config) (*Serf, error) {
queryResponse: make(map[LamportTime]*QueryResponse),
shutdownCh: make(chan struct{}),
state: SerfAlive,
metricLabels: conf.MetricLabels,
}
serf.eventJoinIgnore.Store(false)

Expand Down Expand Up @@ -313,7 +317,9 @@ func Create(conf *Config) (*Serf, error) {

// Set up network coordinate client.
if !conf.DisableCoordinates {
serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig())
coordinateConfig := coordinate.DefaultConfig()
coordinateConfig.MetricLabels = serf.metricLabels
serf.coordClient, err = coordinate.NewClient(coordinateConfig)
if err != nil {
return nil, fmt.Errorf("Failed to create coordinate client: %v", err)
}
Expand All @@ -334,6 +340,7 @@ func Create(conf *Config) (*Serf, error) {
if err != nil {
return nil, fmt.Errorf("Failed to setup snapshot: %v", err)
}
snap.metricLabels = serf.metricLabels
serf.snapshotter = snap
conf.EventCh = eventCh
prev = snap.AliveNodes()
Expand Down Expand Up @@ -404,6 +411,8 @@ func Create(conf *Config) (*Serf, error) {
conf.MemberlistConfig.Alive = md
}

conf.MemberlistConfig.MetricLabels = conf.MetricLabels

// Create the underlying memberlist that will manage membership
// and failure detection for the Serf instance.
memberlist, err := memberlist.Create(conf.MemberlistConfig)
Expand Down Expand Up @@ -953,7 +962,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
oldStatus = member.Status
deadTime := time.Now().Sub(member.leaveTime)
if oldStatus == StatusFailed && deadTime < s.config.FlapTimeout {
metrics.IncrCounter([]string{"serf", "member", "flap"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "member", "flap"}, 1, s.metricLabels)
}

member.Status = StatusAlive
Expand All @@ -980,7 +989,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
}

// Update some metrics
metrics.IncrCounter([]string{"serf", "member", "join"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "member", "join"}, 1, s.metricLabels)

// Send an event along
s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s",
Expand Down Expand Up @@ -1030,7 +1039,7 @@ func (s *Serf) handleNodeLeave(n *memberlist.Node) {
}

// Update some metrics
metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "member", member.Status.String()}, 1, s.metricLabels)

s.logger.Printf("[INFO] serf: %s: %s %s",
eventStr, member.Member.Name, member.Member.Addr)
Expand Down Expand Up @@ -1074,7 +1083,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
member.DelegateCur = n.DCur

// Update some metrics
metrics.IncrCounter([]string{"serf", "member", "update"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "member", "update"}, 1, s.metricLabels)

// Send an event along
s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name)
Expand Down Expand Up @@ -1272,8 +1281,8 @@ func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
seen.Events = append(seen.Events, userEvent)

// Update some metrics
metrics.IncrCounter([]string{"serf", "events"}, 1)
metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "events"}, 1, s.metricLabels)
metrics.IncrCounterWithLabels([]string{"serf", "events", eventMsg.Name}, 1, s.metricLabels)

if s.config.EventCh != nil {
s.config.EventCh <- UserEvent{
Expand Down Expand Up @@ -1331,8 +1340,8 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
seen.QueryIDs = append(seen.QueryIDs, query.ID)

// Update some metrics
metrics.IncrCounter([]string{"serf", "queries"}, 1)
metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "queries"}, 1, s.metricLabels)
metrics.IncrCounterWithLabels([]string{"serf", "queries", query.Name}, 1, s.metricLabels)

// Check if we should rebroadcast, this may be disabled by a flag
rebroadcast := true
Expand Down Expand Up @@ -1419,23 +1428,23 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
if resp.Ack() {
// Exit early if this is a duplicate ack
if _, ok := query.acks[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "query_duplicate_acks"}, 1, s.metricLabels)
return
}

metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "query_acks"}, 1, s.metricLabels)
err := query.sendAck(resp)
if err != nil {
s.logger.Printf("[WARN] %v", err)
}
} else {
// Exit early if this is a duplicate response
if _, ok := query.responses[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "query_duplicate_responses"}, 1, s.metricLabels)
return
}

metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
metrics.IncrCounterWithLabels([]string{"serf", "query_responses"}, 1, s.metricLabels)
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
if err != nil {
s.logger.Printf("[WARN] %v", err)
Expand Down Expand Up @@ -1676,7 +1685,7 @@ func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQue
select {
case <-time.After(s.config.QueueCheckInterval):
numq := queue.NumQueued()
metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
metrics.AddSampleWithLabels([]string{"serf", "queue", name}, float32(numq), s.metricLabels)
if numq >= s.config.QueueDepthWarning {
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
}
Expand Down
5 changes: 3 additions & 2 deletions serf/snapshot.go
Expand Up @@ -78,6 +78,7 @@ type Snapshotter struct {
shutdownCh <-chan struct{}
waitCh chan struct{}
lastAttemptedCompaction time.Time
metricLabels []metrics.Label
}

// PreviousNode is used to represent the previously known alive nodes
Expand Down Expand Up @@ -390,7 +391,7 @@ func (s *Snapshotter) tryAppend(l string) {

// appendLine is used to append a line to the existing log
func (s *Snapshotter) appendLine(l string) error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"serf", "snapshot", "appendLine"}, time.Now(), s.metricLabels)

n, err := s.buffered.WriteString(l)
if err != nil {
Expand Down Expand Up @@ -429,7 +430,7 @@ func (s *Snapshotter) snapshotMaxSize() int64 {

// Compact is used to compact the snapshot once it is too large
func (s *Snapshotter) compact() error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"serf", "snapshot", "compact"}, time.Now(), s.metricLabels)

// Try to open the file to new fiel
newPath := s.path + tmpExt
Expand Down

0 comments on commit 3805ead

Please sign in to comment.