Skip to content

Commit

Permalink
Calculate size of apply queue
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 14, 2023
1 parent 45a2e98 commit 1817cd4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
27 changes: 16 additions & 11 deletions server/raft.go
Expand Up @@ -136,7 +136,6 @@ type raft struct {
werr error // Last write error

state atomic.Int32 // RaftState
overload atomic.Bool // Is the upper layer congested?
hh hash.Hash64 // Highwayhash, used for snapshots
snapfile string // Snapshot filename

Expand Down Expand Up @@ -243,7 +242,7 @@ var (
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
overloadThreshold = 8192
overloadThreshold = 1024 * 1024 * 8
)

type RaftConfig struct {
Expand Down Expand Up @@ -369,6 +368,18 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
rsrc += int64(h.Sum64())
}
}

calc := ipQueue_SizeCalculation[*CommittedEntry](func(e *CommittedEntry) int {
if e == nil {
return 0
}
sz := 8 // 64-bit index
for _, ent := range e.Entries {
sz += 1 + len(ent.Data)
}
return sz
})

n := &raft{
created: time.Now(),
id: hash[:idLen],
Expand All @@ -394,14 +405,15 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
prop: newIPQueue[*Entry](s, qpfx+"entry"),
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry", calc),
stepdown: newIPQueue[string](s, qpfx+"stepdown"),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
extSt: ps.domainExt,
prand: rand.New(rand.NewSource(rsrc)),
}

n.c.registerWithAccount(sacc)

if atomic.LoadInt32(&s.logging.debug) > 0 {
Expand Down Expand Up @@ -920,7 +932,6 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
// Ignore if already applied.
if index > n.applied {
n.applied = index
n.updateOverloadState()
}

// Calculate the number of entries and estimate the byte size that
Expand Down Expand Up @@ -1330,18 +1341,12 @@ func (n *raft) Overloaded() bool {
if n == nil {
return false
}
return n.overload.Load()
return n.apply.size() >= overloadThreshold
}

// Pushes to the apply queue and updates the overloaded state. Lock must be held.
func (n *raft) pushToApply(ce *CommittedEntry) {
n.apply.push(ce)
n.updateOverloadState()
}

// Updates the overloaded state. Lock must be held.
func (n *raft) updateOverloadState() {
n.overload.Store(n.apply.len() >= overloadThreshold || n.commit-n.applied >= uint64(overloadThreshold))
}

// HadPreviousLeader indicates if this group ever had a leader.
Expand Down
2 changes: 1 addition & 1 deletion server/raft_test.go
Expand Up @@ -299,8 +299,8 @@ func TestNRGDetectOverload(t *testing.T) {
defer func() {
overloadThreshold = origOverloadThreshold
}()
overloadThreshold = 8
iterations := 32
overloadThreshold = (iterations - 2) * 8

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down

0 comments on commit 1817cd4

Please sign in to comment.