Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Remove stepdown channel, handle inline #5344

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 52 additions & 54 deletions server/raft.go
Expand Up @@ -195,15 +195,14 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
stepdown *ipQueue[string] // Stepdown requests
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
}

// cacthupState structure that holds our subscription, and catchup term and index
Expand Down Expand Up @@ -385,7 +384,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
stepdown: newIPQueue[string](s, qpfx+"stepdown"),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
Expand Down Expand Up @@ -858,7 +856,7 @@ func (n *raft) PauseApply() error {

// If we are currently a candidate make sure we step down.
if n.State() == Candidate {
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
}

n.debug("Pausing our apply channel")
Expand Down Expand Up @@ -1247,6 +1245,21 @@ func (n *raft) Leader() bool {
return n.State() == Leader
}

// stepdown immediately steps down the Raft node to the
// follower state. This will take the lock itself.
func (n *raft) stepdown(newLeader string) {
n.Lock()
defer n.Unlock()
n.stepdownLocked(newLeader)
}

// stepdownLocked immediately steps down the Raft node to the
// follower state. This requires the lock is already held.
func (n *raft) stepdownLocked(newLeader string) {
n.debug("Stepping down")
n.switchToFollowerLocked(newLeader)
}

// isCatchingUp returns true if a catchup is currently taking place.
func (n *raft) isCatchingUp() bool {
n.RLock()
Expand Down Expand Up @@ -1454,7 +1467,6 @@ func (n *raft) StepDown(preferred ...string) error {
n.vote = noVote
n.writeTermVote()

stepdown := n.stepdown
prop := n.prop
n.Unlock()

Expand All @@ -1468,8 +1480,7 @@ func (n *raft) StepDown(preferred ...string) error {
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
} else {
// Force us to stepdown here.
n.debug("Stepping down")
stepdown.push(noLeader)
n.stepdown(noLeader)
}

return nil
Expand Down Expand Up @@ -1640,7 +1651,7 @@ func (n *raft) shutdown(shouldDelete bool) {
// just will remove them from the central monitoring map
queues := []interface {
unregister()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown}
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.unregister()
}
Expand Down Expand Up @@ -1907,7 +1918,7 @@ func (n *raft) processAppendEntries() {
// runAsFollower is called by run and will block for as long as the node is
// running in the follower state.
func (n *raft) runAsFollower() {
for {
for n.State() == Follower {
elect := n.electTimer()

select {
Expand Down Expand Up @@ -1958,13 +1969,6 @@ func (n *raft) runAsFollower() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
// We've received a stepdown request, start following the new leader if
// we can.
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -2301,14 +2305,14 @@ func (n *raft) runAsLeader() {
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}

Expand Down Expand Up @@ -2363,7 +2367,7 @@ func (n *raft) runAsLeader() {
if b.Type == EntryLeaderTransfer {
n.prop.recycle(&es)
n.debug("Stepping down due to leadership transfer")
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
// We need to re-create `entries` because there is a reference
Expand All @@ -2378,7 +2382,7 @@ func (n *raft) runAsLeader() {
}
case <-lq.C:
if n.lostQuorum() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
case <-n.votes.ch:
Expand All @@ -2388,7 +2392,7 @@ func (n *raft) runAsLeader() {
continue
}
if vresp.term > n.Term() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
Expand All @@ -2397,11 +2401,6 @@ func (n *raft) runAsLeader() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
case <-n.entry.ch:
n.processAppendEntries()
}
Expand Down Expand Up @@ -2570,7 +2569,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
snap, err := n.loadLastSnapshot()
if err != nil {
// We need to stepdown here when this happens.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
// We need to reset our state here as well.
n.resetWAL()
return 0, err
Expand Down Expand Up @@ -2636,7 +2635,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err)
if err == ErrStoreEOF {
// If we are here we are seeing a request for an item beyond our state, meaning we should stepdown.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand All @@ -2648,7 +2647,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
// If we are here we are seeing a request for an item we do not have, meaning we should stepdown.
// This is possible on a reset of our WAL but the other side has a snapshot already.
// If we do not stepdown this can cycle.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand Down Expand Up @@ -2705,7 +2704,7 @@ func (n *raft) applyCommit(index uint64) error {
if err != ErrStoreClosed && err != ErrStoreEOF {
n.warn("Got an error loading %d index: %v - will reset", index, err)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -2783,7 +2782,7 @@ func (n *raft) applyCommit(index uint64) error {

// If this is us and we are the leader we should attempt to stepdown.
if peer == n.id && n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdown(n.selectNextLeader())
}

// Remove from string intern map.
Expand Down Expand Up @@ -2917,7 +2916,7 @@ func (n *raft) runAsCandidate() {
n.ID(): {},
}

for {
for n.State() == Candidate {
elect := n.electTimer()
select {
case <-n.entry.ch:
Expand Down Expand Up @@ -2960,20 +2959,15 @@ func (n *raft) runAsCandidate() {
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.stepdown.push(noLeader)
n.lxfer = false
n.stepdownLocked(noLeader)
n.Unlock()
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -3131,7 +3125,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.writeTermVote()
}
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
} else {
// Let them know we are the leader.
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
Expand All @@ -3158,7 +3152,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.writeTermVote()
}
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3221,7 +3215,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3463,7 +3457,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.vote = noVote
n.writeTermVote()
n.warn("Detected another leader with higher term, will stepdown and reset")
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.resetWAL()
n.Unlock()
arPool.Put(ar)
Expand Down Expand Up @@ -3511,7 +3505,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
if index := ae.pindex + 1; index != seq {
n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -3904,7 +3898,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
if n.State() != Follower {
n.debug("Stepping down from %s, detected higher term: %d vs %d",
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.term = vr.term
}
n.vote = noVote
Expand Down Expand Up @@ -4036,13 +4030,17 @@ const (
)

func (n *raft) switchToFollower(leader string) {
n.Lock()
defer n.Unlock()

n.switchToFollowerLocked(leader)
}

func (n *raft) switchToFollowerLocked(leader string) {
if n.State() == Closed {
return
}

n.Lock()
defer n.Unlock()

n.debug("Switching to follower")

n.lxfer = false
Expand Down