Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AddPeer API #5123

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
96 changes: 96 additions & 0 deletions server/jetstream_api.go
Expand Up @@ -195,6 +195,12 @@ const (
// Will return JSON response.
JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"

// JSApiAddServer is the endpoint to add a peer server to the cluster.
// Only allow to add peer previously removed.
// Only works from system account.
// Will return JSON response.
JSApiAddServer = "$JS.API.SERVER.ADD"

// JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
// Only works from system account.
// Will return JSON response.
Expand Down Expand Up @@ -591,6 +597,23 @@ type JSApiLeaderStepDownResponse struct {

const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"

// JSApiMetaServerAddRequest will add a peer to the meta group.
type JSApiMetaServerAddRequest struct {
// Server name of the peer to be added.
Server string `json:"peer"`
// Peer ID of the peer to be added. If specified this is used
// instead of the server name.
Peer string `json:"peer_id,omitempty"`
}

// JSApiMetaServerAddResponse is the response to a peer addition request in the meta group.
type JSApiMetaServerAddResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}

const JSApiMetaServerAddResponseType = "io.nats.jetstream.api.v1.meta_server_add_response"

// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
type JSApiMetaServerRemoveRequest struct {
// Server name of the peer to be removed.
Expand Down Expand Up @@ -757,6 +780,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Ignore system level directives meta stepdown and peer remove requests here.
if subject == JSApiLeaderStepDown ||
subject == JSApiRemoveServer ||
subject == JSApiAddServer ||
strings.HasPrefix(subject, jsAPIAccountPre) {
return
}
Expand Down Expand Up @@ -2292,6 +2316,78 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// Request to have the metaleader add a peer to the system.
func (s *Server) jsLeaderServerAddRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}

ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return
}

js.mu.RLock()
isLeader := cc.isLeader()
js.mu.RUnlock()

if !isLeader {
return
}

var resp = JSApiMetaServerAddResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerAddResponseType}}

if isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var req JSApiMetaServerAddRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var found string
js.mu.RLock()
// Use nodeToInfo because cc.meta.Peers() does not have peer.
if req.Peer != _EMPTY_ {
_, ok := s.nodeToInfo.Load(req.Peer)
if ok {
found = req.Peer
}
} else {
s.nodeToInfo.Range(func(id, ni interface{}) bool {
if req.Server == ni.(nodeInfo).name {
found = id.(string)
return false
}
return true
})
}
js.mu.RUnlock()

if found == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}

js.mu.Lock()
cc.meta.ProposeAddPeer(found)
js.mu.Unlock()

resp.Success = true
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}

// Request to have the metaleader remove a peer from the system.
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
Expand Down
5 changes: 5 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -59,6 +59,8 @@ type jetStreamCluster struct {
consumerResults *subscription
// System level request to have the leader stepdown.
stepdown *subscription
// System level requests to add a peer.
peerAdd *subscription
// System level requests to remove a peer.
peerRemove *subscription
// System level request to move a stream
Expand Down Expand Up @@ -5310,6 +5312,9 @@ func (js *jetStream) startUpdatesSub() {
if cc.stepdown == nil {
cc.stepdown, _ = s.systemSubscribe(JSApiLeaderStepDown, _EMPTY_, false, c, s.jsLeaderStepDownRequest)
}
if cc.peerAdd == nil {
cc.peerAdd, _ = s.systemSubscribe(JSApiAddServer, _EMPTY_, false, c, s.jsLeaderServerAddRequest)
}
if cc.peerRemove == nil {
cc.peerRemove, _ = s.systemSubscribe(JSApiRemoveServer, _EMPTY_, false, c, s.jsLeaderServerRemoveRequest)
}
Expand Down
40 changes: 40 additions & 0 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -3568,6 +3568,10 @@ func TestJetStreamClusterPeerRemovalAndStreamReassignment(t *testing.T) {
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
if !resp.Success {
t.Fatal("Unexpected success")
}

// In case that server was also meta-leader.
c.waitOnLeader()

Expand Down Expand Up @@ -3600,6 +3604,42 @@ func TestJetStreamClusterPeerRemovalAndStreamReassignment(t *testing.T) {
}
return nil
})

// Restore the node.
toAdd := toRemove
reqAdd := JSApiMetaServerAddRequest{Server: toAdd}
jsAddReq, err := json.Marshal(reqAdd)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

rAddMsg, err := nc.Request(JSApiAddServer, jsAddReq, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var addResp JSApiMetaServerAddResponse
if err := json.Unmarshal(rAddMsg.Data, &addResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if addResp.Error != nil {
t.Fatalf("Unexpected error: %+v", addResp.Error)
}

if !addResp.Success {
t.Fatal("Unexpected success")
}

// In case that server was also meta-leader.
c.waitOnLeader()

checkFor(t, 15*time.Second, 250*time.Millisecond, func() error {
for _, s := range ml.JetStreamClusterPeers() {
if s == toAdd {
return nil
}
}
return fmt.Errorf("Server not in the peer list")
})
}

func TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace(t *testing.T) {
Expand Down