Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report observer mode in replica state #4582

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
@@ -1,5 +1,7 @@
module github.com/nats-io/nats-server/v2

replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.30.1-0.20230925101945-66fd3c30ab7b

go 1.20

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats.go v1.29.0 h1:dSXZ+SZeGyTdHVYeXimeq12FsIpb9dM8CJ2IZFiHcyE=
github.com/nats-io/nats.go v1.29.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.30.1-0.20230925101945-66fd3c30ab7b h1:Ah3/w4+xttYxf40CtdYPYwVpW/sVppLblSE7NOKhLQg=
github.com/nats-io/nats.go v1.30.1-0.20230925101945-66fd3c30ab7b/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
15 changes: 8 additions & 7 deletions server/jetstream_cluster.go
Expand Up @@ -3010,7 +3010,7 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
for _, rp := range node.Peers() {
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
si := sir.(nodeInfo)
pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag}
pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Observer: rp.Observer, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag}
replicas = append(replicas, pi)
}
}
Expand Down Expand Up @@ -4006,7 +4006,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Select a new peer to transfer to. If we are a migrating make sure its from the new cluster.
var npeer string
for _, r := range peers {
if !r.Current {
if !r.Current { // TODO(nat): Should this check r.Observer?
continue
}
if !migrating {
Expand Down Expand Up @@ -8176,11 +8176,12 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
// yet (which can happen after the whole cluster is stopped and only some
// of the nodes are restarted).
pi := &PeerInfo{
Current: current,
Offline: true,
Active: lastSeen,
Lag: rp.Lag,
Peer: rp.ID,
Current: current,
Observer: rp.Observer,
Offline: true,
Active: lastSeen,
Lag: rp.Lag,
Peer: rp.ID,
}
// If node is found, complete/update the settings.
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
Expand Down
41 changes: 41 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -5815,3 +5815,44 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
cancel()
wg.Wait()
}

func TestJetStreamClusterReportsObserverMode(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

// The transferRaftLeaders function is called when a server
// enters LDM and will put all of the Raft nodes into observer
// state. Other nodes will now be notified about this by the
// EntryObserver proposal.
sl := c.streamLeader(globalAccountName, "TEST")
require_True(t, sl.transferRaftLeaders())
c.waitOnStreamLeader(globalAccountName, "TEST")
c.waitOnAllCurrent()

si, err := js.StreamInfo("TEST")
require_NoError(t, err)

// The former stream leader should now report observer state.
found := false
for _, r := range si.Cluster.Replicas {
if sl.Name() == r.Name {
found = r.Observer
break
}
}
if !found {
t.Fatalf("Expected previous stream leader %q to now be observer", sl.Name())
}
}
55 changes: 39 additions & 16 deletions server/raft.go
Expand Up @@ -90,10 +90,11 @@ type WAL interface {
}

type Peer struct {
ID string
Current bool
Last time.Time
Lag uint64
ID string
Current bool
Observer bool
Last time.Time
Lag uint64
}

type RaftState uint8
Expand Down Expand Up @@ -227,6 +228,7 @@ type lps struct {
ts int64
li uint64
kp bool // marks as known peer.
ob bool // marks as observer.
}

const (
Expand All @@ -237,7 +239,6 @@ const (
hbIntervalDefault = 1 * time.Second
lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds

)

var (
Expand Down Expand Up @@ -469,12 +470,12 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
n.apply.push(nil)

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true, false}
// Track known peers
for _, peer := range ps.knownPeers {
// Set these to 0 to start but mark as known peer.
if peer != n.id {
n.peers[peer] = &lps{0, 0, true}
n.peers[peer] = &lps{0, 0, true, false}
}
}

Expand Down Expand Up @@ -1463,10 +1464,11 @@ func (n *raft) Peers() []*Peer {
lag = n.commit - ps.li
}
p := &Peer{
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Last: time.Unix(0, ps.ts),
Lag: lag,
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Observer: ps.ob,
Last: time.Unix(0, ps.ts),
Lag: lag,
}
peers = append(peers, p)
}
Expand Down Expand Up @@ -1766,6 +1768,15 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) {
defer n.Unlock()
n.observer = isObserver
n.extSt = extSt

var data []byte
if isObserver {
data = append(data, 1)
} else {
data = append(data, 0)
}
data = append(data, n.id...)
n.prop.push(newEntry(EntryObserverAdvisory, data))
}

// Invoked when being notified that there is something in the entryc's queue
Expand Down Expand Up @@ -1934,6 +1945,7 @@ const (
EntryRemovePeer
EntryLeaderTransfer
EntrySnapshot
EntryObserverAdvisory
)

func (t EntryType) String() string {
Expand All @@ -1952,6 +1964,8 @@ func (t EntryType) String() string {
return "LeaderTransfer"
case EntrySnapshot:
return "Snapshot"
case EntryObserverAdvisory:
return "ObserverAdvisory"
}
return fmt.Sprintf("Unknown [%d]", uint8(t))
}
Expand Down Expand Up @@ -2617,7 +2631,7 @@ func (n *raft) applyCommit(index uint64) error {

if lp, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true}
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true, false}
} else {
// Mark as added.
lp.kp = true
Expand Down Expand Up @@ -2657,6 +2671,15 @@ func (n *raft) applyCommit(index uint64) error {

// We pass these up as well.
committed = append(committed, e)

case EntryObserverAdvisory:
if len(e.Data) < 2 {
n.debug("Malformed observer advisory entry")
continue
}
if p, ok := n.peers[string(e.Data[1:])]; ok {
p.ob = e.Data[0] == 1
}
}
}
// Pass to the upper layers if we have normal entries.
Expand Down Expand Up @@ -2760,7 +2783,7 @@ func (n *raft) trackPeer(peer string) error {
if ps := n.peers[peer]; ps != nil {
ps.ts = time.Now().UnixNano()
} else if !isRemoved {
n.peers[peer] = &lps{time.Now().UnixNano(), 0, false}
n.peers[peer] = &lps{time.Now().UnixNano(), 0, false, false}
}
n.Unlock()

Expand Down Expand Up @@ -3015,7 +3038,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now().UnixNano()
} else {
n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true}
n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true, false}
}
}

Expand Down Expand Up @@ -3222,7 +3245,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ps := n.peers[newPeer]; ps != nil {
ps.ts = time.Now().UnixNano()
} else {
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false}
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false, false}
}
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
Expand Down Expand Up @@ -3270,7 +3293,7 @@ func (n *raft) processPeerState(ps *peerState) {
lp.kp = true
n.peers[peer] = lp
} else {
n.peers[peer] = &lps{0, 0, true}
n.peers[peer] = &lps{0, 0, true, false}
}
}
n.debug("Update peers from leader to %+v", n.peers)
Expand Down
13 changes: 7 additions & 6 deletions server/stream.go
Expand Up @@ -168,12 +168,13 @@ type ClusterInfo struct {
// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
Peer string `json:"peer"`
Name string `json:"name"`
Current bool `json:"current"`
Observer bool `json:"observer,omitempty"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
Peer string `json:"peer"`
// For migrations.
cluster string
}
Expand Down