Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG: Drop append entries when upper layer is overloaded #4735

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 38 additions & 7 deletions server/ipqueue.go
Expand Up @@ -28,33 +28,46 @@ type ipQueue[T any] struct {
elts []T
pos int
pool *sync.Pool
mrs int
mrs int // Max recycle size
calc func(e T) uint64 // Per-entry size calculator
sz atomic.Uint64 // Calculated size (only if calc != nil)
name string
m *sync.Map
}

type ipQueueOpts struct {
type ipQueueOpts[T any] struct {
maxRecycleSize int
calc func(e T) uint64
}

type ipQueueOpt func(*ipQueueOpts)
type ipQueueOpt[T any] func(*ipQueueOpts[T])

// This option allows to set the maximum recycle size when attempting
// to put back a slice to the pool.
func ipQueue_MaxRecycleSize(max int) ipQueueOpt {
return func(o *ipQueueOpts) {
func ipQueue_MaxRecycleSize[T any](max int) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.maxRecycleSize = max
}
}

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] {
qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
// This option enables total queue size counting by passing in a function
// that evaluates the size of each entry as it is pushed/popped. This option
// enables the size() function.
func ipQueue_SizeCalculation[T any](calc func(e T) uint64) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.calc = calc
}
}

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T] {
qo := ipQueueOpts[T]{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
for _, o := range opts {
o(&qo)
}
q := &ipQueue[T]{
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
calc: qo.calc,
pool: &sync.Pool{},
name: name,
m: &s.ipQueues,
Expand Down Expand Up @@ -83,6 +96,9 @@ func (q *ipQueue[T]) push(e T) int {
}
}
q.elts = append(q.elts, e)
if q.calc != nil {
q.sz.Add(q.calc(e))
}
l++
q.Unlock()
if signal {
Expand Down Expand Up @@ -116,6 +132,11 @@ func (q *ipQueue[T]) pop() []T {
}
q.elts, q.pos = nil, 0
atomic.AddInt64(&q.inprogress, int64(len(elts)))
if q.calc != nil {
for _, e := range elts {
q.sz.Add(-q.calc(e))
}
}
q.Unlock()
return elts
}
Expand All @@ -140,6 +161,9 @@ func (q *ipQueue[T]) popOne() (T, bool) {
}
e := q.elts[q.pos]
q.pos++
if q.calc != nil {
q.sz.Add(-q.calc(e))
}
l--
if l > 0 {
// We need to re-signal
Expand Down Expand Up @@ -189,6 +213,12 @@ func (q *ipQueue[T]) len() int {
return l
}

// Returns the calculated size of the queue (if ipQueue_SizeCalculation has been
// passed in), otherwise returns zero.
func (q *ipQueue[T]) size() uint64 {
return q.sz.Load()
}

// Empty the queue and consumes the notification signal if present.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
Expand All @@ -202,6 +232,7 @@ func (q *ipQueue[T]) drain() {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
}
q.sz.Store(0)
// Consume the signal if it was present to reduce the chance of a reader
// routine to be think that there is something in the queue...
select {
Expand Down
31 changes: 29 additions & 2 deletions server/ipqueue_test.go
Expand Up @@ -42,7 +42,7 @@ func TestIPQueueBasic(t *testing.T) {
}

// Try to change the max recycle size
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize[int](10))
if q2.mrs != 10 {
t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) {
}
}

q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize[int](10))
for i := 0; i < 100; i++ {
q.push(i)
}
Expand Down Expand Up @@ -389,3 +389,30 @@ func TestIPQueueDrain(t *testing.T) {
}
}
}

func TestIPQueueSizeCalculation(t *testing.T) {
type testType = [16]byte
var testValue testType

calc := ipQueue_SizeCalculation[testType](func(e testType) uint64 {
return uint64(len(e))
})
s := &Server{}
q := newIPQueue[testType](s, "test", calc)

for i := 0; i < 10; i++ {
q.push(testValue)
require_Equal(t, q.len(), i+1)
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
}

for i := 10; i > 5; i-- {
q.popOne()
require_Equal(t, q.len(), i-1)
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
}

q.pop()
require_Equal(t, q.len(), 0)
require_Equal(t, q.size(), 0)
}
61 changes: 50 additions & 11 deletions server/raft.go
Expand Up @@ -73,6 +73,7 @@ type RaftNode interface {
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Overloaded() bool
Stop()
Delete()
Wipe()
Expand Down Expand Up @@ -235,13 +236,14 @@ const (
)

var (
minElectionTimeout = minElectionTimeoutDefault
maxElectionTimeout = maxElectionTimeoutDefault
minCampaignTimeout = minCampaignTimeoutDefault
maxCampaignTimeout = maxCampaignTimeoutDefault
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
minElectionTimeout = minElectionTimeoutDefault
maxElectionTimeout = maxElectionTimeoutDefault
minCampaignTimeout = minCampaignTimeoutDefault
maxCampaignTimeout = maxCampaignTimeoutDefault
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
overloadThreshold uint64 = 1024 * 1024 * 8
)

type RaftConfig struct {
Expand Down Expand Up @@ -336,6 +338,19 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined})
}

// calculateCommittedEntrySizeForIPQ is passed into the ipQueue to
// help with tracking the size of the committed entries in the queue.
func calculateCommittedEntrySizeForIPQ(e *CommittedEntry) uint64 {
if e == nil {
return 0
}
sz := uint64(8) // 64-bit index
for _, ent := range e.Entries {
sz += uint64(1 + len(ent.Data))
}
return sz
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
if cfg == nil {
Expand All @@ -360,6 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
return nil, errNoPeerState
}

calc := ipQueue_SizeCalculation[*CommittedEntry](calculateCommittedEntrySizeForIPQ)
qpfx := fmt.Sprintf("[ACC:%s] RAFT '%s' ", accName, cfg.Name)
n := &raft{
created: time.Now(),
Expand All @@ -386,12 +402,13 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
prop: newIPQueue[*Entry](s, qpfx+"entry"),
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry", calc),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
extSt: ps.domainExt,
}

n.c.registerWithAccount(sacc)

if atomic.LoadInt32(&s.logging.debug) > 0 {
Expand Down Expand Up @@ -473,6 +490,9 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}
}

// Send nil entry to signal the upper layers we are done doing replay/restore.
n.pushToApply(nil)

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}

Expand Down Expand Up @@ -1178,7 +1198,7 @@ func (n *raft) setupLastSnapshot() {
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
n.pushToApply(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
Expand Down Expand Up @@ -1375,6 +1395,18 @@ func (n *raft) Healthy() bool {
return n.isCurrent(true)
}

func (n *raft) Overloaded() bool {
if n == nil {
return false
}
return n.apply.size() >= overloadThreshold
}

// Pushes to the apply queue and updates the overloaded state. Lock must be held.
func (n *raft) pushToApply(ce *CommittedEntry) {
n.apply.push(ce)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function returns pending, so could short circuit some work here.

}

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
Expand Down Expand Up @@ -2803,7 +2835,7 @@ func (n *raft) applyCommit(index uint64) error {
if fpae {
delete(n.pae, index)
}
n.apply.push(newCommittedEntry(index, committed))
n.pushToApply(newCommittedEntry(index, committed))
} else {
// If we processed inline update our applied index.
n.applied = index
Expand Down Expand Up @@ -2980,6 +3012,13 @@ func (n *raft) runAsCandidate() {
// handleAppendEntry handles an append entry from the wire. This function
// is an internal callback from the "asubj" append entry subscription.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
// If we are overwhelmed, i.e. the upper layer is not applying entries
// fast enough and our apply queue is building up, start to drop new
// append entries instead.
if n.Overloaded() {
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just check n.apply.len()? Always present, so do not need to lock the raft group, and ipq underneath has its own which we use here anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overloaded() doesn't take a lock but now just checks the apply queue size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes the ipq lock, so we are now taking it twice in that function I think.

return
}

msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
// Push to the new entry channel. From here one of the worker
Expand Down Expand Up @@ -3299,7 +3338,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.pushToApply(newCommittedEntry(n.commit, ae.entries[:1]))
n.Unlock()
return

Expand Down
32 changes: 28 additions & 4 deletions server/raft_helpers_test.go
Expand Up @@ -83,6 +83,27 @@ func (sg smGroup) nonLeader() stateMachine {
return nil
}

// Causes the upper layer to purposefully block on receipt of
// append entries until unwedge is called, simulating the scenario
// that the upper layer is stuck on processing something.
// Note that this is different from PauseApply, which stops the
// Raft layer from sending applies to the upper layer at all.
func (sg smGroup) wedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Lock()
}
}

// Unwedges the upper layer. Any append entries that have built
// up in the apply queue will start to apply.
// Note that this is different from ResumeApply, which starts the
// Raft layer sending applies to the upper layer again.
func (sg smGroup) unwedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Unlock()
}
}

// Create a raft group and place on numMembers servers at random.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
c.t.Helper()
Expand Down Expand Up @@ -153,10 +174,11 @@ func smLoop(sm stateMachine) {
// The adder state just sums up int64 values.
type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
cfg *RaftConfig
sum int64
s *Server
n RaftNode
cfg *RaftConfig
sum int64
wedge sync.Mutex
}

// Simple getters for server and the raft node.
Expand All @@ -178,6 +200,8 @@ func (a *stateAdder) propose(data []byte) {
}

func (a *stateAdder) applyEntry(ce *CommittedEntry) {
a.wedge.Lock()
defer a.wedge.Unlock()
a.Lock()
defer a.Unlock()
if ce == nil {
Expand Down
32 changes: 32 additions & 0 deletions server/raft_test.go
Expand Up @@ -291,3 +291,35 @@ func TestNRGSimpleElection(t *testing.T) {
require_Equal(t, rn.vote, vr.candidate)
}
}

func TestNRGDetectOverload(t *testing.T) {
origOverloadThreshold := overloadThreshold
defer func() {
overloadThreshold = origOverloadThreshold
}()
iterations := 32
overloadThreshold = uint64(iterations-2) * 8

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

rg.wedge()

sa := rg.leader().(*stateAdder)
sn := sa.node()

for i := 0; i < iterations; i++ {
sa.proposeDelta(1)
time.Sleep(time.Millisecond * 5)
}

require_True(t, sn.Overloaded())

rg.unwedge()
rg.waitOnTotal(t, int64(iterations))

require_False(t, sn.Overloaded())
}