From ace424ea865b24a1ea66087d375051687b9fd404 Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Thu, 14 Apr 2022 09:06:40 -0400 Subject: [PATCH] Add HeartbeatTimeout and ElectionTimeout to reloadable config. (#496) --- api.go | 16 ++++++ config.go | 16 +++++- integ_test.go | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++ observer.go | 2 +- raft.go | 26 +++++++++- raft_test.go | 39 +++++++++++++++ 6 files changed, 228 insertions(+), 4 deletions(-) diff --git a/api.go b/api.go index 82c279609..5c17bd6df 100644 --- a/api.go +++ b/api.go @@ -201,6 +201,12 @@ type Raft struct { // leadershipTransferCh is used to start a leadership transfer from outside of // the main thread. leadershipTransferCh chan *leadershipTransferFuture + + // leaderNotifyCh is used to tell leader that config has changed + leaderNotifyCh chan struct{} + + // followerNotifyCh is used to tell followers that config has changed + followerNotifyCh chan struct{} } // BootstrapCluster initializes a server's storage with the given cluster @@ -545,6 +551,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna bootstrapCh: make(chan *bootstrapFuture), observers: make(map[uint64]*Observer), leadershipTransferCh: make(chan *leadershipTransferFuture, 1), + leaderNotifyCh: make(chan struct{}, 1), + followerNotifyCh: make(chan struct{}, 1), } r.conf.Store(*conf) @@ -696,6 +704,14 @@ func (r *Raft) ReloadConfig(rc ReloadableConfig) error { return err } r.conf.Store(newCfg) + + if rc.HeartbeatTimeout < oldCfg.HeartbeatTimeout { + // On leader, ensure replication loops running with a longer + // timeout than what we want now discover the change. + asyncNotifyCh(r.leaderNotifyCh) + // On follower, update current timer to use the shorter new value. + asyncNotifyCh(r.followerNotifyCh) + } return nil } diff --git a/config.go b/config.go index ef1f7adaa..8df4ae74e 100644 --- a/config.go +++ b/config.go @@ -260,6 +260,14 @@ type ReloadableConfig struct { // we perform a snapshot. This is to prevent excessive snapshots when we can // just replay a small set of logs. SnapshotThreshold uint64 + + // HeartbeatTimeout specifies the time in follower state without + // a leader before we attempt an election. + HeartbeatTimeout time.Duration + + // ElectionTimeout specifies the time in candidate state without + // a leader before we attempt an election. + ElectionTimeout time.Duration } // apply sets the reloadable fields on the passed Config to the values in @@ -269,6 +277,8 @@ func (rc *ReloadableConfig) apply(to Config) Config { to.TrailingLogs = rc.TrailingLogs to.SnapshotInterval = rc.SnapshotInterval to.SnapshotThreshold = rc.SnapshotThreshold + to.HeartbeatTimeout = rc.HeartbeatTimeout + to.ElectionTimeout = rc.ElectionTimeout return to } @@ -277,6 +287,8 @@ func (rc *ReloadableConfig) fromConfig(from Config) { rc.TrailingLogs = from.TrailingLogs rc.SnapshotInterval = from.SnapshotInterval rc.SnapshotThreshold = from.SnapshotThreshold + rc.HeartbeatTimeout = from.HeartbeatTimeout + rc.ElectionTimeout = from.ElectionTimeout } // DefaultConfig returns a Config with usable defaults. @@ -334,10 +346,10 @@ func ValidateConfig(config *Config) error { return fmt.Errorf("LeaderLeaseTimeout is too low") } if config.LeaderLeaseTimeout > config.HeartbeatTimeout { - return fmt.Errorf("LeaderLeaseTimeout cannot be larger than heartbeat timeout") + return fmt.Errorf("LeaderLeaseTimeout (%s) cannot be larger than heartbeat timeout (%s)", config.LeaderLeaseTimeout, config.HeartbeatTimeout) } if config.ElectionTimeout < config.HeartbeatTimeout { - return fmt.Errorf("ElectionTimeout must be equal or greater than Heartbeat Timeout") + return fmt.Errorf("ElectionTimeout (%s) must be equal or greater than Heartbeat Timeout (%s)", config.ElectionTimeout, config.HeartbeatTimeout) } return nil } diff --git a/integ_test.go b/integ_test.go index 310e5868e..ad158334a 100644 --- a/integ_test.go +++ b/integ_test.go @@ -5,10 +5,12 @@ import ( "fmt" "io/ioutil" "os" + "sync/atomic" "testing" "time" "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" ) // CheckInteg will skip a test if integration testing is not enabled. @@ -355,3 +357,134 @@ func TestRaft_Integ(t *testing.T) { e.Release() } } + +func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) { + CheckInteg(t) + tests := []struct { + name string + restartInitialTimeouts time.Duration + expectNewLeader bool + }{ + {"Default", 0, true}, + {"InitialHigher", time.Second, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := DefaultConfig() + conf.LocalID = ServerID("first") + conf.HeartbeatTimeout = 50 * time.Millisecond + conf.ElectionTimeout = 50 * time.Millisecond + conf.LeaderLeaseTimeout = 50 * time.Millisecond + conf.CommitTimeout = 5 * time.Millisecond + conf.SnapshotThreshold = 100 + conf.TrailingLogs = 10 + + // Create a single node + env1 := MakeRaft(t, conf, true) + NoErr(WaitFor(env1, Leader), t) + + // Join a few nodes! + var envs []*RaftEnv + for i := 0; i < 2; i++ { + conf.LocalID = ServerID(fmt.Sprintf("next-batch-%d", i)) + env := MakeRaft(t, conf, false) + addr := env.trans.LocalAddr() + NoErr(WaitFuture(env1.raft.AddVoter(conf.LocalID, addr, 0, 0), t), t) + envs = append(envs, env) + } + allEnvs := append([]*RaftEnv{env1}, envs...) + + // Wait for a leader + _, err := WaitForAny(Leader, append([]*RaftEnv{env1}, envs...)) + NoErr(err, t) + + CheckConsistent(append([]*RaftEnv{env1}, envs...), t) + // TODO without this sleep, the restarted follower doesn't have any stored config + // and aborts the election because it doesn't know of any peers. Shouldn't + // CheckConsistent prevent that? + time.Sleep(time.Second) + + // shutdown a follower + disconnected := envs[len(envs)-1] + disconnected.logger.Info("stopping follower") + disconnected.Shutdown() + + seeNewLeader := func(o *Observation) bool { _, ok := o.Data.(LeaderObservation); return ok } + leaderCh := make(chan Observation) + // TODO Closing this channel results in panics, even though we're calling Release. + //defer close(leaderCh) + leaderChanges := new(uint32) + go func() { + for range leaderCh { + atomic.AddUint32(leaderChanges, 1) + } + }() + + requestVoteCh := make(chan Observation) + seeRequestVote := func(o *Observation) bool { _, ok := o.Data.(RequestVoteRequest); return ok } + requestVotes := new(uint32) + go func() { + for range requestVoteCh { + atomic.AddUint32(requestVotes, 1) + } + }() + + for _, env := range allEnvs { + env.raft.RegisterObserver(NewObserver(leaderCh, false, seeNewLeader)) + } + + // Unfortunately we need to wait for the leader to start backing off RPCs to the down follower + // such that when the follower comes back up it'll run an election before it gets an rpc from + // the leader + time.Sleep(time.Second * 5) + + if tt.restartInitialTimeouts != 0 { + disconnected.conf.HeartbeatTimeout = tt.restartInitialTimeouts + disconnected.conf.ElectionTimeout = tt.restartInitialTimeouts + } + disconnected.logger.Info("restarting follower") + disconnected.Restart(t) + + time.Sleep(time.Second * 2) + + if tt.expectNewLeader { + require.NotEqual(t, 0, atomic.LoadUint32(leaderChanges)) + } else { + require.Equal(t, uint32(0), atomic.LoadUint32(leaderChanges)) + } + + if tt.restartInitialTimeouts != 0 { + for _, env := range envs { + env.raft.RegisterObserver(NewObserver(requestVoteCh, false, seeRequestVote)) + NoErr(env.raft.ReloadConfig(ReloadableConfig{ + TrailingLogs: conf.TrailingLogs, + SnapshotInterval: conf.SnapshotInterval, + SnapshotThreshold: conf.SnapshotThreshold, + HeartbeatTimeout: 250 * time.Millisecond, + ElectionTimeout: 250 * time.Millisecond, + }), t) + } + // Make sure that reload by itself doesn't trigger a vote + time.Sleep(300 * time.Millisecond) + require.Equal(t, uint32(0), atomic.LoadUint32(requestVotes)) + + // Stop the leader, ensure that we don't see a request vote within the first 50ms + // (original config of the non-restarted follower), but that we do see one within + // the 250ms both followers should now be using for heartbeat timeout. Well, not + // quite: we wait for two heartbeat intervals (plus a fudge factor), because the + // first time around, last contact will have been recent enough that no vote will + // be triggered. + env1.logger.Info("stopping leader") + env1.Shutdown() + time.Sleep(50 * time.Millisecond) + require.Equal(t, uint32(0), atomic.LoadUint32(requestVotes)) + time.Sleep(600 * time.Millisecond) + require.NotEqual(t, uint32(0), atomic.LoadUint32(requestVotes)) + } + + for _, e := range allEnvs { + e.Release() + } + }) + } +} diff --git a/observer.go b/observer.go index c45e7f632..630c50c9d 100644 --- a/observer.go +++ b/observer.go @@ -10,7 +10,7 @@ type Observation struct { // Raft holds the Raft instance generating the observation. Raft *Raft // Data holds observation-specific data. Possible types are - // *RequestVoteRequest + // RequestVoteRequest // RaftState // PeerObservation // LeaderObservation diff --git a/raft.go b/raft.go index b442aea00..1c7aecd89 100644 --- a/raft.go +++ b/raft.go @@ -190,6 +190,12 @@ func (r *Raft) runFollower() { case b := <-r.bootstrapCh: b.respond(r.liveBootstrap(b.configuration)) + case <-r.leaderNotifyCh: + // Ignore since we are not the leader + + case <-r.followerNotifyCh: + heartbeatTimer = time.After(0) + case <-heartbeatTimer: // Restart the heartbeat timer hbTimeout := r.config().HeartbeatTimeout @@ -275,7 +281,8 @@ func (r *Raft) runCandidate() { // otherwise. defer func() { r.candidateFromLeadershipTransfer = false }() - electionTimer := randomTimeout(r.config().ElectionTimeout) + electionTimeout := r.config().ElectionTimeout + electionTimer := randomTimeout(electionTimeout) // Tally the votes, need a simple majority grantedVotes := 0 @@ -337,6 +344,15 @@ func (r *Raft) runCandidate() { case b := <-r.bootstrapCh: b.respond(ErrCantBootstrap) + case <-r.leaderNotifyCh: + // Ignore since we are not the leader + + case <-r.followerNotifyCh: + if electionTimeout != r.config().ElectionTimeout { + electionTimeout = r.config().ElectionTimeout + electionTimer = randomTimeout(electionTimeout) + } + case <-electionTimer: // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate @@ -826,6 +842,14 @@ func (r *Raft) leaderLoop() { // Renew the lease timer lease = time.After(checkInterval) + case <-r.leaderNotifyCh: + for _, repl := range r.leaderState.replState { + asyncNotifyCh(repl.notifyCh) + } + + case <-r.followerNotifyCh: + // Ignore since we are not a follower + case <-r.shutdownCh: return } diff --git a/raft_test.go b/raft_test.go index 6af480eb1..8a11ced8e 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2466,6 +2466,7 @@ func TestRaft_CacheLogWithStoreError(t *testing.T) { func TestRaft_ReloadConfig(t *testing.T) { conf := inmemConfig(t) + conf.LeaderLeaseTimeout = 40 * time.Millisecond c := MakeCluster(1, t, conf) defer c.Close() raft := c.rafts[0] @@ -2480,6 +2481,8 @@ func TestRaft_ReloadConfig(t *testing.T) { TrailingLogs: 12345, SnapshotInterval: 234 * time.Second, SnapshotThreshold: 6789, + HeartbeatTimeout: 45 * time.Millisecond, + ElectionTimeout: 46 * time.Millisecond, } require.NoError(t, raft.ReloadConfig(newCfg)) @@ -2488,6 +2491,8 @@ func TestRaft_ReloadConfig(t *testing.T) { require.Equal(t, newCfg.TrailingLogs, raft.config().TrailingLogs) require.Equal(t, newCfg.SnapshotInterval, raft.config().SnapshotInterval) require.Equal(t, newCfg.SnapshotThreshold, raft.config().SnapshotThreshold) + require.Equal(t, newCfg.HeartbeatTimeout, raft.config().HeartbeatTimeout) + require.Equal(t, newCfg.ElectionTimeout, raft.config().ElectionTimeout) } func TestRaft_ReloadConfigValidates(t *testing.T) { @@ -2776,3 +2781,37 @@ func TestRaft_runFollower_State_Transition(t *testing.T) { }) } } + +func TestRaft_runFollower_ReloadTimeoutConfigs(t *testing.T) { + conf := DefaultConfig() + conf.LocalID = ServerID("first") + conf.HeartbeatTimeout = 500 * time.Millisecond + conf.ElectionTimeout = 500 * time.Millisecond + conf.LeaderLeaseTimeout = 50 * time.Millisecond + conf.CommitTimeout = 5 * time.Millisecond + conf.SnapshotThreshold = 100 + conf.TrailingLogs = 10 + conf.skipStartup = true + + env := MakeRaft(t, conf, false) + servers := []Server{{Voter, "first", ""}} + env.raft.setLatestConfiguration(Configuration{Servers: servers}, 1) + env.raft.setState(Follower) + + // run the follower loop exclusively + go env.raft.runFollower() + + newCfg := ReloadableConfig{ + TrailingLogs: conf.TrailingLogs, + SnapshotInterval: conf.SnapshotInterval, + SnapshotThreshold: conf.SnapshotThreshold, + HeartbeatTimeout: 50 * time.Millisecond, + ElectionTimeout: 50 * time.Millisecond, + } + require.NoError(t, env.raft.ReloadConfig(newCfg)) + // wait enough time to have HeartbeatTimeout + time.Sleep(3 * newCfg.HeartbeatTimeout) + + // Check the follower loop set the right state + require.Equal(t, Candidate, env.raft.getState()) +}