Skip to content

Commit

Permalink
Raft: Drop append entries when upper layer is overloaded
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 14, 2023
1 parent d0f44f4 commit 358df16
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
37 changes: 33 additions & 4 deletions server/raft.go
Expand Up @@ -70,6 +70,7 @@ type RaftNode interface {
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Overloaded() bool
Stop()
Delete()
Wipe()
Expand Down Expand Up @@ -135,6 +136,7 @@ type raft struct {
werr error // Last write error

state atomic.Int32 // RaftState
overload atomic.Bool // Is the upper layer congested?
hh hash.Hash64 // Highwayhash, used for snapshots
snapfile string // Snapshot filename

Expand Down Expand Up @@ -241,6 +243,7 @@ var (
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
overloadThreshold = 8192
)

type RaftConfig struct {
Expand Down Expand Up @@ -467,7 +470,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}

// Send nil entry to signal the upper layers we are done doing replay/restore.
n.apply.push(nil)
n.pushToApply(nil)

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
Expand Down Expand Up @@ -917,6 +920,7 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
// Ignore if already applied.
if index > n.applied {
n.applied = index
n.updateOverloadState()
}

// Calculate the number of entries and estimate the byte size that
Expand Down Expand Up @@ -1157,7 +1161,7 @@ func (n *raft) setupLastSnapshot() {
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
n.pushToApply(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
Expand Down Expand Up @@ -1322,6 +1326,24 @@ func (n *raft) Healthy() bool {
return n.isCurrent(true)
}

func (n *raft) Overloaded() bool {
if n == nil {
return false
}
return n.overload.Load()
}

// Pushes to the apply queue and updates the overloaded state. Lock must be held.
func (n *raft) pushToApply(ce *CommittedEntry) {
n.apply.push(ce)
n.updateOverloadState()
}

// Updates the overloaded state. Lock must be held.
func (n *raft) updateOverloadState() {
n.overload.Store(n.apply.len() >= overloadThreshold || n.commit-n.applied >= uint64(overloadThreshold))
}

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
Expand Down Expand Up @@ -2757,7 +2779,7 @@ func (n *raft) applyCommit(index uint64) error {
if fpae {
delete(n.pae, index)
}
n.apply.push(newCommittedEntry(index, committed))
n.pushToApply(newCommittedEntry(index, committed))
} else {
// If we processed inline update our applied index.
n.applied = index
Expand Down Expand Up @@ -2939,6 +2961,13 @@ func (n *raft) runAsCandidate() {
// handleAppendEntry handles an append entry from the wire. This function
// is an internal callback from the "asubj" append entry subscription.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
// If we are overwhelmed, i.e. the upper layer is not applying entries
// fast enough and our apply queue is building up, start to drop new
// append entries instead.
if n.Overloaded() {
return
}

msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
// Push to the new entry channel. From here one of the worker
Expand Down Expand Up @@ -3258,7 +3287,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.pushToApply(newCommittedEntry(n.commit, ae.entries[:1]))
n.Unlock()
return

Expand Down
32 changes: 28 additions & 4 deletions server/raft_helpers_test.go
Expand Up @@ -83,6 +83,27 @@ func (sg smGroup) nonLeader() stateMachine {
return nil
}

// Causes the upper layer to purposefully block on receipt of
// append entries until unwedge is called, simulating the scenario
// that the upper layer is stuck on processing something.
// Note that this is different from PauseApply, which stops the
// Raft layer from sending applies to the upper layer at all.
func (sg smGroup) wedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Lock()
}
}

// Unwedges the upper layer. Any append entries that have built
// up in the apply queue will start to apply.
// Note that this is different from ResumeApply, which starts the
// Raft layer sending applies to the upper layer again.
func (sg smGroup) unwedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Unlock()
}
}

// Create a raft group and place on numMembers servers at random.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
c.t.Helper()
Expand Down Expand Up @@ -153,10 +174,11 @@ func smLoop(sm stateMachine) {
// The adder state just sums up int64 values.
type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
cfg *RaftConfig
sum int64
s *Server
n RaftNode
cfg *RaftConfig
sum int64
wedge sync.Mutex
}

// Simple getters for server and the raft node.
Expand All @@ -178,6 +200,8 @@ func (a *stateAdder) propose(data []byte) {
}

func (a *stateAdder) applyEntry(ce *CommittedEntry) {
a.wedge.Lock()
defer a.wedge.Unlock()
a.Lock()
defer a.Unlock()
if ce == nil {
Expand Down
32 changes: 32 additions & 0 deletions server/raft_test.go
Expand Up @@ -293,3 +293,35 @@ func TestNRGSimpleElection(t *testing.T) {
require_Equal(t, rn.vote, vr.candidate)
}
}

func TestNRGDetectOverload(t *testing.T) {
origOverloadThreshold := overloadThreshold
defer func() {
overloadThreshold = origOverloadThreshold
}()
overloadThreshold = 8
iterations := 32

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

rg.wedge()

sa := rg.leader().(*stateAdder)
sn := sa.node()

for i := 0; i < iterations; i++ {
sa.proposeDelta(1)
time.Sleep(time.Millisecond * 5)
}

require_True(t, sn.Overloaded())

rg.unwedge()
rg.waitOnTotal(t, int64(iterations))

require_False(t, sn.Overloaded())
}

0 comments on commit 358df16

Please sign in to comment.