From 44124c28758b8cfb675e90c75a204a08a84f8d4f Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Wed, 27 Apr 2022 15:45:37 +0100 Subject: [PATCH] =?UTF-8?q?Thread=20saturation=20metrics=20=F0=9F=93=88=20?= =?UTF-8?q?(#489)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds metrics suggested in #488, to record the percentage of time the main and FSM goroutines are busy with work vs available to accept new work, to give operators an idea of how close they are to hitting capacity limits. --- api.go | 4 ++ fsm.go | 8 +++ raft.go | 52 +++++++++++++--- saturation.go | 111 ++++++++++++++++++++++++++++++++++ saturation_test.go | 144 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 311 insertions(+), 8 deletions(-) create mode 100644 saturation.go create mode 100644 saturation_test.go diff --git a/api.go b/api.go index 5c17bd6df..eda33c3f0 100644 --- a/api.go +++ b/api.go @@ -207,6 +207,9 @@ type Raft struct { // followerNotifyCh is used to tell followers that config has changed followerNotifyCh chan struct{} + + // mainThreadSaturation measures the saturation of the main raft goroutine. + mainThreadSaturation *saturationMetric } // BootstrapCluster initializes a server's storage with the given cluster @@ -553,6 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), + mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index 487cb4b78..6d26a9a28 100644 --- a/fsm.go +++ b/fsm.go @@ -223,9 +223,15 @@ func (r *Raft) runFSM() { req.respond(err) } + saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second) + for { + saturation.sleeping() + select { case ptr := <-r.fsmMutateCh: + saturation.working() + switch req := ptr.(type) { case []*commitTuple: applyBatch(req) @@ -238,6 +244,8 @@ func (r *Raft) runFSM() { } case req := <-r.fsmSnapshotCh: + saturation.working() + snapshot(req) case <-r.shutdownCh: diff --git a/raft.go b/raft.go index 4b85ac1ef..a509b3b3d 100644 --- a/raft.go +++ b/raft.go @@ -159,35 +159,45 @@ func (r *Raft) runFollower() { heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(r.liveBootstrap(b.configuration)) case <-r.leaderNotifyCh: @@ -197,6 +207,7 @@ func (r *Raft) runFollower() { heartbeatTimer = time.After(0) case <-heartbeatTimer: + r.mainThreadSaturation.working() // Restart the heartbeat timer hbTimeout := r.config().HeartbeatTimeout heartbeatTimer = randomTimeout(hbTimeout) @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() { r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case vote := <-voteCh: + r.mainThreadSaturation.working() // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { r.logger.Debug("newer term discovered, fallback to follower") @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() { } case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case <-r.leaderNotifyCh: @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() { } case <-electionTimer: + r.mainThreadSaturation.working() // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate r.logger.Warn("Election timeout reached, restarting election") @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() { lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case <-r.leaderState.stepDown: + r.mainThreadSaturation.working() r.setState(Follower) case future := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() { go r.leadershipTransfer(*id, *address, state, stopCh, doneCh) case <-r.leaderState.commitCh: + r.mainThreadSaturation.working() // Process the newly committed entries oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() { } case v := <-r.verifyCh: + r.mainThreadSaturation.working() if v.quorumSize == 0 { // Just dispatched, start the verification r.verifyLeader(v) @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() { } case future := <-r.userRestoreCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() { future.respond(err) case future := <-r.configurationsCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() { future.respond(nil) case future := <-r.configurationChangeChIfStable(): + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() { r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() { } case <-lease: + r.mainThreadSaturation.working() // Check if we've exceeded the lease, potentially stepping down maxDiff := r.checkLeaderLease() diff --git a/saturation.go b/saturation.go new file mode 100644 index 000000000..c01430f32 --- /dev/null +++ b/saturation.go @@ -0,0 +1,111 @@ +package raft + +import ( + "math" + "time" + + "github.com/armon/go-metrics" +) + +// saturationMetric measures the saturation (percentage of time spent working vs +// waiting for work) of an event processing loop, such as runFSM. It reports the +// saturation as a gauge metric (at most) once every reportInterval. +// +// Callers must instrument their loop with calls to sleeping and working, starting +// with a call to sleeping. +// +// Note: the caller must be single-threaded and saturationMetric is not safe for +// concurrent use by multiple goroutines. +type saturationMetric struct { + reportInterval time.Duration + + // slept contains time for which the event processing loop was sleeping rather + // than working in the period since lastReport. + slept time.Duration + + // lost contains time that is considered lost due to incorrect use of + // saturationMetricBucket (e.g. calling sleeping() or working() multiple + // times in succession) in the period since lastReport. + lost time.Duration + + lastReport, sleepBegan, workBegan time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) +} + +// newSaturationMetric creates a saturationMetric that will update the gauge +// with the given name at the given reportInterval. keepPrev determines the +// number of previous measurements that will be used to smooth out spikes. +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { + m := &saturationMetric{ + reportInterval: reportInterval, + nowFn: time.Now, + lastReport: time.Now(), + reportFn: func(sat float32) { metrics.AddSample(name, sat) }, + } + return m +} + +// sleeping records the time at which the loop began waiting for work. After the +// initial call it must always be proceeded by a call to working. +func (s *saturationMetric) sleeping() { + now := s.nowFn() + + if !s.sleepBegan.IsZero() { + // sleeping called twice in succession. Count that time as lost rather than + // measuring nonsense. + s.lost += now.Sub(s.sleepBegan) + } + + s.sleepBegan = now + s.workBegan = time.Time{} + s.report() +} + +// working records the time at which the loop began working. It must always be +// proceeded by a call to sleeping. +func (s *saturationMetric) working() { + now := s.nowFn() + + if s.workBegan.IsZero() { + if s.sleepBegan.IsZero() { + // working called before the initial call to sleeping. Count that time as + // lost rather than measuring nonsense. + s.lost += now.Sub(s.lastReport) + } else { + s.slept += now.Sub(s.sleepBegan) + } + } else { + // working called twice in succession. Count that time as lost rather than + // measuring nonsense. + s.lost += now.Sub(s.workBegan) + } + + s.workBegan = now + s.sleepBegan = time.Time{} + s.report() +} + +// report updates the gauge if reportInterval has passed since our last report. +func (s *saturationMetric) report() { + now := s.nowFn() + timeSinceLastReport := now.Sub(s.lastReport) + + if timeSinceLastReport < s.reportInterval { + return + } + + var saturation float64 + total := timeSinceLastReport - s.lost + if total != 0 { + saturation = float64(total-s.slept) / float64(total) + saturation = math.Round(saturation*100) / 100 + } + s.reportFn(float32(saturation)) + + s.slept = 0 + s.lost = 0 + s.lastReport = now +} diff --git a/saturation_test.go b/saturation_test.go new file mode 100644 index 000000000..35046f347 --- /dev/null +++ b/saturation_test.go @@ -0,0 +1,144 @@ +package raft + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSaturationMetric(t *testing.T) { + t.Run("without smoothing", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(val float32) { reported = val } + + sat.sleeping() + + // First window: 50ms sleeping + 75ms working. + now = now.Add(50 * time.Millisecond) + sat.working() + + now = now.Add(75 * time.Millisecond) + sat.sleeping() + + // Should be 60% saturation. + require.Equal(t, float32(0.6), reported) + + // Second window: 90ms sleeping + 10ms working. + now = now.Add(90 * time.Millisecond) + sat.working() + + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + // Should be 10% saturation. + require.Equal(t, float32(0.1), reported) + + // Third window: 100ms sleeping + 0ms working. + now = now.Add(100 * time.Millisecond) + sat.working() + + // Should be 0% saturation. + require.Equal(t, float32(0), reported) + }) +} + +func TestSaturationMetric_IncorrectUsage(t *testing.T) { + t.Run("calling sleeping() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling sleeping() consecutively should reset sleepBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => Working (10ms) + // +20ms | sleeping() | + // => [!] LOST [!] (10ms) + // +30ms | sleeping() | + // => Sleeping (10ms) + // +40ms | working() | + // => Working (10ms) + // +50ms | sleeping() | + // + // Total reportable time: 40ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling working() consecutively should reset workBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => [!] LOST [!] (10ms) + // +20ms | working() | + // => Working (10ms) + // +30ms | sleeping() | + // + // Total reportable time: 20ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() first", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Time from start until working() is treated as lost. + sat.working() + require.Equal(t, float32(0), reported) + + sat.sleeping() + now = now.Add(5 * time.Millisecond) + sat.working() + now = now.Add(5 * time.Millisecond) + sat.sleeping() + require.Equal(t, float32(0.5), reported) + }) +}