Skip to content

Commit

Permalink
Raft: Drop append entries when upper layer is overloaded
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Nov 2, 2023
1 parent f404640 commit d907c97
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 4 deletions.
27 changes: 27 additions & 0 deletions server/raft.go
Expand Up @@ -70,6 +70,7 @@ type RaftNode interface {
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Overloaded() bool
Stop()
Delete()
Wipe()
Expand Down Expand Up @@ -245,6 +246,7 @@ var (
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
overloadThreshold = 8192
)

type RaftConfig struct {
Expand Down Expand Up @@ -1252,6 +1254,24 @@ func (n *raft) Healthy() bool {
return n.isCurrent(true)
}

func (n *raft) Overloaded() bool {
if n == nil {
return false
}
n.RLock()
defer n.RUnlock()
// If the apply queue is building up then that is the first sign
// of overload.
if n.apply.len() >= overloadThreshold {
return true
}
// The second sign of overload is that there's a growing drift
// between the commit and the applied — after all, the upper layer
// might have popped entries off the apply queue but not processed
// them yet.
return n.commit-n.applied >= uint64(overloadThreshold)
}

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
Expand Down Expand Up @@ -2836,6 +2856,13 @@ func (n *raft) runAsCandidate() {

// handleAppendEntry handles an append entry from the wire.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
// If we are overwhelmed, i.e. the upper layer is not applying entries
// fast enough and our apply queue is building up, start to drop new
// append entries instead.
if n.Overloaded() {
return
}

msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
n.entry.push(ae)
Expand Down
32 changes: 28 additions & 4 deletions server/raft_helpers_test.go
Expand Up @@ -83,6 +83,27 @@ func (sg smGroup) nonLeader() stateMachine {
return nil
}

// Causes the upper layer to purposefully block on receipt of
// append entries until unwedge is called, simulating the scenario
// that the upper layer is stuck on processing something.
// Note that this is different from PauseApply, which stops the
// Raft layer from sending applies to the upper layer at all.
func (sg smGroup) wedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Lock()
}
}

// Unwedges the upper layer. Any append entries that have built
// up in the apply queue will start to apply.
// Note that this is different from ResumeApply, which starts the
// Raft layer sending applies to the upper layer again.
func (sg smGroup) unwedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Unlock()
}
}

// Create a raft group and place on numMembers servers at random.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
c.t.Helper()
Expand Down Expand Up @@ -153,10 +174,11 @@ func smLoop(sm stateMachine) {
// The adder state just sums up int64 values.
type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
cfg *RaftConfig
sum int64
s *Server
n RaftNode
cfg *RaftConfig
sum int64
wedge sync.Mutex
}

// Simple getters for server and the raft node.
Expand All @@ -178,6 +200,8 @@ func (a *stateAdder) propose(data []byte) {
}

func (a *stateAdder) applyEntry(ce *CommittedEntry) {
a.wedge.Lock()
defer a.wedge.Unlock()
a.Lock()
defer a.Unlock()
if ce == nil {
Expand Down
32 changes: 32 additions & 0 deletions server/raft_test.go
Expand Up @@ -214,3 +214,35 @@ func TestNRGObserverMode(t *testing.T) {
require_True(t, n.node().IsObserver())
}
}

func TestNRGDetectOverload(t *testing.T) {
origOverloadThreshold := overloadThreshold
defer func() {
overloadThreshold = origOverloadThreshold
}()
overloadThreshold = 8
iterations := 32

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

rg.wedge()

sa := rg.leader().(*stateAdder)
sn := sa.node()

for i := 0; i < iterations; i++ {
sa.proposeDelta(1)
time.Sleep(time.Millisecond * 5)
}

require_True(t, sn.Overloaded())

rg.unwedge()
rg.waitOnTotal(t, int64(iterations))

require_False(t, sn.Overloaded())
}

0 comments on commit d907c97

Please sign in to comment.