New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NRG: Drop append entries when upper layer is overloaded #4735
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,7 @@ type RaftNode interface { | |
LeadChangeC() <-chan bool | ||
QuitC() <-chan struct{} | ||
Created() time.Time | ||
Overloaded() bool | ||
Stop() | ||
Delete() | ||
Wipe() | ||
|
@@ -235,13 +236,14 @@ const ( | |
) | ||
|
||
var ( | ||
minElectionTimeout = minElectionTimeoutDefault | ||
maxElectionTimeout = maxElectionTimeoutDefault | ||
minCampaignTimeout = minCampaignTimeoutDefault | ||
maxCampaignTimeout = maxCampaignTimeoutDefault | ||
hbInterval = hbIntervalDefault | ||
lostQuorumInterval = lostQuorumIntervalDefault | ||
lostQuorumCheck = lostQuorumCheckIntervalDefault | ||
minElectionTimeout = minElectionTimeoutDefault | ||
maxElectionTimeout = maxElectionTimeoutDefault | ||
minCampaignTimeout = minCampaignTimeoutDefault | ||
maxCampaignTimeout = maxCampaignTimeoutDefault | ||
hbInterval = hbIntervalDefault | ||
lostQuorumInterval = lostQuorumIntervalDefault | ||
lostQuorumCheck = lostQuorumCheckIntervalDefault | ||
overloadThreshold uint64 = 1024 * 1024 * 8 | ||
) | ||
|
||
type RaftConfig struct { | ||
|
@@ -336,6 +338,19 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer | |
return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined}) | ||
} | ||
|
||
// calculateCommittedEntrySizeForIPQ is passed into the ipQueue to | ||
// help with tracking the size of the committed entries in the queue. | ||
func calculateCommittedEntrySizeForIPQ(e *CommittedEntry) uint64 { | ||
if e == nil { | ||
return 0 | ||
} | ||
sz := uint64(8) // 64-bit index | ||
for _, ent := range e.Entries { | ||
sz += uint64(1 + len(ent.Data)) | ||
} | ||
return sz | ||
} | ||
|
||
// startRaftNode will start the raft node. | ||
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) { | ||
if cfg == nil { | ||
|
@@ -360,6 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe | |
return nil, errNoPeerState | ||
} | ||
|
||
calc := ipQueue_SizeCalculation[*CommittedEntry](calculateCommittedEntrySizeForIPQ) | ||
qpfx := fmt.Sprintf("[ACC:%s] RAFT '%s' ", accName, cfg.Name) | ||
n := &raft{ | ||
created: time.Now(), | ||
|
@@ -386,12 +402,13 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe | |
prop: newIPQueue[*Entry](s, qpfx+"entry"), | ||
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"), | ||
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"), | ||
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"), | ||
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry", calc), | ||
accName: accName, | ||
leadc: make(chan bool, 1), | ||
observer: cfg.Observer, | ||
extSt: ps.domainExt, | ||
} | ||
|
||
n.c.registerWithAccount(sacc) | ||
|
||
if atomic.LoadInt32(&s.logging.debug) > 0 { | ||
|
@@ -473,6 +490,9 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe | |
} | ||
} | ||
|
||
// Send nil entry to signal the upper layers we are done doing replay/restore. | ||
n.pushToApply(nil) | ||
|
||
// Make sure to track ourselves. | ||
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true} | ||
|
||
|
@@ -1178,7 +1198,7 @@ func (n *raft) setupLastSnapshot() { | |
n.pterm = snap.lastTerm | ||
n.commit = snap.lastIndex | ||
n.applied = snap.lastIndex | ||
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) | ||
n.pushToApply(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) | ||
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { | ||
n.setWriteErrLocked(err) | ||
} | ||
|
@@ -1375,6 +1395,18 @@ func (n *raft) Healthy() bool { | |
return n.isCurrent(true) | ||
} | ||
|
||
func (n *raft) Overloaded() bool { | ||
if n == nil { | ||
return false | ||
} | ||
return n.apply.size() >= overloadThreshold | ||
} | ||
|
||
// Pushes to the apply queue and updates the overloaded state. Lock must be held. | ||
func (n *raft) pushToApply(ce *CommittedEntry) { | ||
n.apply.push(ce) | ||
} | ||
|
||
// HadPreviousLeader indicates if this group ever had a leader. | ||
func (n *raft) HadPreviousLeader() bool { | ||
n.RLock() | ||
|
@@ -2803,7 +2835,7 @@ func (n *raft) applyCommit(index uint64) error { | |
if fpae { | ||
delete(n.pae, index) | ||
} | ||
n.apply.push(newCommittedEntry(index, committed)) | ||
n.pushToApply(newCommittedEntry(index, committed)) | ||
} else { | ||
// If we processed inline update our applied index. | ||
n.applied = index | ||
|
@@ -2980,6 +3012,13 @@ func (n *raft) runAsCandidate() { | |
// handleAppendEntry handles an append entry from the wire. This function | ||
// is an internal callback from the "asubj" append entry subscription. | ||
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { | ||
// If we are overwhelmed, i.e. the upper layer is not applying entries | ||
// fast enough and our apply queue is building up, start to drop new | ||
// append entries instead. | ||
if n.Overloaded() { | ||
neilalexander marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just check n.apply.len()? Always present, so do not need to lock the raft group, and ipq underneath has its own which we use here anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It takes the ipq lock, so we are now taking it twice in that function I think. |
||
return | ||
} | ||
|
||
msg = copyBytes(msg) | ||
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil { | ||
// Push to the new entry channel. From here one of the worker | ||
|
@@ -3299,7 +3338,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { | |
} | ||
|
||
// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. | ||
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) | ||
n.pushToApply(newCommittedEntry(n.commit, ae.entries[:1])) | ||
n.Unlock() | ||
return | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function returns pending, so could short circuit some work here.