Skip to content

Commit

Permalink
NRG: Ignore vote requests if leader heard more recently than election…
Browse files Browse the repository at this point in the history
… timeout

This is actually a safeguard that the Raft paper prescribes as a way of avoiding
network partitioned nodes from coming back up with a high term number and causing
the existing leader to step down unnecessarily.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Apr 24, 2024
1 parent 163f02c commit 786711e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 26 deletions.
26 changes: 23 additions & 3 deletions server/raft.go
Expand Up @@ -162,9 +162,10 @@ type raft struct {
applied uint64 // Sequence number of the most recently applied commit
hcbehind bool // Were we falling behind at the last health check? (see: isCurrent)

leader string // The ID of the leader
vote string // Our current vote state
lxfer bool // Are we doing a leadership transfer?
leader string // The ID of the leader
vote string // Our current vote state
lxfer bool // Are we doing a leadership transfer?
llae time.Time // Leader last append entry

s *Server // Reference to top-level server
c *client // Internal client for subscriptions
Expand Down Expand Up @@ -3107,6 +3108,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}

// Keep a track of when we last heard from who we believe to be the leader.
if ae.leader == n.leader {
n.llae = time.Now()
n.resetElectionTimeout()
}

// Scratch buffer for responses.
var scratch [appendEntryResponseLen]byte
arbuf := scratch[:]
Expand Down Expand Up @@ -3880,6 +3887,19 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {

n.Lock()

if n.leader != "" {
// A node that comes up after state reset or being in its own network partition
// for a long time might come back with a very high term number but potentially
// be behind in the log. The Raft paper addresses this in section 6 by suggesting
// that we should ignore vote requests if we think there's a valid leader still
// around so that it doesn't get forced to step down in that case.
if time.Since(n.llae) < minElectionTimeout {
// If we've heard from our leader recently then we should ignore a vote request.
n.Unlock()
return nil
}
}

vresp := &voteResponse{n.term, n.id, false}
defer n.debug("Sending a voteResponse %+v -> %q", vresp, vr.reply)

Expand Down
58 changes: 35 additions & 23 deletions server/raft_test.go
Expand Up @@ -16,6 +16,7 @@ package server
import (
"math"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -234,61 +235,72 @@ func TestNRGSimpleElection(t *testing.T) {

voteReqs := make(chan *nats.Msg, 1)
voteResps := make(chan *nats.Msg, len(rg)-1)

// Keep a record of the term when we started.
leader := rg.leader().node().(*raft)
startTerm := leader.term

// Subscribe to the vote request subject, this should be the
// same across all nodes in the group.
_, err := nc.ChanSubscribe(leader.vsubj, voteReqs)
vss, err := nc.ChanSubscribe(leader.vsubj, voteReqs)
require_NoError(t, err)
defer vss.Unsubscribe()

// Subscribe to all of the vote response inboxes for all nodes
// in the Raft group, as they can differ.
for _, n := range rg {
rn := n.node().(*raft)
_, err = nc.ChanSubscribe(rn.vreply, voteResps)
vrs, err := nc.ChanSubscribe(rn.vreply, voteResps)
require_NoError(t, err)
defer vrs.Unsubscribe()
}

// Step down, this will start a new voting session.
require_NoError(t, rg.leader().node().StepDown())

// Wait for a vote request to come in.
msg := require_ChanRead(t, voteReqs, time.Second)
vr := decodeVoteRequest(msg.Data, msg.Reply)
require_True(t, vr != nil)
require_NotEqual(t, vr.candidate, "")

// The leader should have bumped their term in order to start
// an election.
require_Equal(t, vr.term, startTerm+1)
require_Equal(t, vr.lastTerm, startTerm)
// Start tracking incoming vote requests.
var vr *voteRequest
var vrmu sync.Mutex
go func() {
for msg := range voteReqs {
vrmu.Lock()
vr = decodeVoteRequest(msg.Data, msg.Reply)
t.Logf("VR -> %+v", vr)
vrmu.Unlock()
}
}()

// Wait for all of the vote responses to come in. There should
// be as many vote responses as there are followers.
for i := 0; i < len(rg)-1; i++ {
msg := require_ChanRead(t, voteResps, time.Second)
msg := require_ChanRead(t, voteResps, maxElectionTimeout+time.Second)
re := decodeVoteResponse(msg.Data)
require_True(t, re != nil)

vrmu.Lock()
vrterm := vr.term
vrmu.Unlock()
if re.term != vrterm {
continue
}

t.Logf("%s -> %+v", msg.Reply, re)

// The vote should have been granted.
require_Equal(t, re.granted, true)
// require_Equal(t, re.granted, true)

// The node granted the vote, therefore the term in the vote
// response should have advanced as well.
require_Equal(t, re.term, vr.term)
require_Equal(t, re.term, startTerm+1)
require_Equal(t, re.term, vrterm)
//require_Equal(t, re.term, startTerm+1)
}

// Everyone in the group should have voted for our candidate
// and arrived at the term from the vote request.
// The majority of the group hopefully voted for our candidate
// and arrived at the leader & term from the vote request.

vrmu.Lock()
defer vrmu.Unlock()
for _, n := range rg {
rn := n.node().(*raft)
require_Equal(t, rn.term, vr.term)
require_Equal(t, rn.term, startTerm+1)
require_Equal(t, rn.vote, vr.candidate)
require_Equal(t, rn.leader, vr.candidate)
}
}

Expand Down

0 comments on commit 786711e

Please sign in to comment.