Skip to content

Commit

Permalink
Add advisory proposal for reporting observer state
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 27, 2023
1 parent 38d0bbe commit fe84762
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 11 deletions.
2 changes: 2 additions & 0 deletions go.mod
@@ -1,5 +1,7 @@
module github.com/nats-io/nats-server/v2

replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.30.1-0.20230925101945-66fd3c30ab7b

go 1.20

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats.go v1.29.0 h1:dSXZ+SZeGyTdHVYeXimeq12FsIpb9dM8CJ2IZFiHcyE=
github.com/nats-io/nats.go v1.29.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.30.1-0.20230925101945-66fd3c30ab7b h1:Ah3/w4+xttYxf40CtdYPYwVpW/sVppLblSE7NOKhLQg=
github.com/nats-io/nats.go v1.30.1-0.20230925101945-66fd3c30ab7b/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
41 changes: 41 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -5815,3 +5815,44 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
cancel()
wg.Wait()
}

func TestJetStreamClusterReportsObserverMode(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

// The transferRaftLeaders function is called when a server
// enters LDM and will put all of the Raft nodes into observer
// state. Other nodes will now be notified about this by the
// EntryObserver proposal.
sl := c.streamLeader(globalAccountName, "TEST")
require_True(t, sl.transferRaftLeaders())
c.waitOnStreamLeader(globalAccountName, "TEST")
c.waitOnAllCurrent()

si, err := js.StreamInfo("TEST")
require_NoError(t, err)

// The former stream leader should now report observer state.
found := false
for _, r := range si.Cluster.Replicas {
if sl.Name() == r.Name {
found = r.Observer
break
}
}
if !found {
t.Fatalf("Expected previous stream leader %q to now be observer", sl.Name())
}
}
39 changes: 30 additions & 9 deletions server/raft.go
Expand Up @@ -228,6 +228,7 @@ type lps struct {
ts int64
li uint64
kp bool // marks as known peer.
ob bool // marks as observer.
}

const (
Expand All @@ -238,7 +239,6 @@ const (
hbIntervalDefault = 1 * time.Second
lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds

)

var (
Expand Down Expand Up @@ -470,12 +470,12 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
n.apply.push(nil)

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true, false}
// Track known peers
for _, peer := range ps.knownPeers {
// Set these to 0 to start but mark as known peer.
if peer != n.id {
n.peers[peer] = &lps{0, 0, true}
n.peers[peer] = &lps{0, 0, true, false}
}
}

Expand Down Expand Up @@ -1466,7 +1466,7 @@ func (n *raft) Peers() []*Peer {
p := &Peer{
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Observer: n.observer,
Observer: ps.ob,
Last: time.Unix(0, ps.ts),
Lag: lag,
}
Expand Down Expand Up @@ -1768,6 +1768,15 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) {
defer n.Unlock()
n.observer = isObserver
n.extSt = extSt

var data []byte
if isObserver {
data = append(data, 1)
} else {
data = append(data, 0)
}
data = append(data, n.id...)
n.prop.push(newEntry(EntryObserverAdvisory, data))
}

// Invoked when being notified that there is something in the entryc's queue
Expand Down Expand Up @@ -1936,6 +1945,7 @@ const (
EntryRemovePeer
EntryLeaderTransfer
EntrySnapshot
EntryObserverAdvisory
)

func (t EntryType) String() string {
Expand All @@ -1954,6 +1964,8 @@ func (t EntryType) String() string {
return "LeaderTransfer"
case EntrySnapshot:
return "Snapshot"
case EntryObserverAdvisory:
return "ObserverAdvisory"
}
return fmt.Sprintf("Unknown [%d]", uint8(t))
}
Expand Down Expand Up @@ -2619,7 +2631,7 @@ func (n *raft) applyCommit(index uint64) error {

if lp, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true}
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true, false}
} else {
// Mark as added.
lp.kp = true
Expand Down Expand Up @@ -2659,6 +2671,15 @@ func (n *raft) applyCommit(index uint64) error {

// We pass these up as well.
committed = append(committed, e)

case EntryObserverAdvisory:
if len(e.Data) < 2 {
n.debug("Malformed observer advisory entry")
continue
}
if p, ok := n.peers[string(e.Data[1:])]; ok {
p.ob = e.Data[0] == 1
}
}
}
// Pass to the upper layers if we have normal entries.
Expand Down Expand Up @@ -2762,7 +2783,7 @@ func (n *raft) trackPeer(peer string) error {
if ps := n.peers[peer]; ps != nil {
ps.ts = time.Now().UnixNano()
} else if !isRemoved {
n.peers[peer] = &lps{time.Now().UnixNano(), 0, false}
n.peers[peer] = &lps{time.Now().UnixNano(), 0, false, false}
}
n.Unlock()

Expand Down Expand Up @@ -3017,7 +3038,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now().UnixNano()
} else {
n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true}
n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true, false}
}
}

Expand Down Expand Up @@ -3224,7 +3245,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ps := n.peers[newPeer]; ps != nil {
ps.ts = time.Now().UnixNano()
} else {
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false}
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false, false}
}
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
Expand Down Expand Up @@ -3272,7 +3293,7 @@ func (n *raft) processPeerState(ps *peerState) {
lp.kp = true
n.peers[peer] = lp
} else {
n.peers[peer] = &lps{0, 0, true}
n.peers[peer] = &lps{0, 0, true, false}
}
}
n.debug("Update peers from leader to %+v", n.peers)
Expand Down

0 comments on commit fe84762

Please sign in to comment.