Skip to content

Commit

Permalink
Merge pull request #13683 from serathius/publishV3
Browse files Browse the repository at this point in the history
server: Switch to publishV3
  • Loading branch information
ptabor committed Feb 21, 2022
2 parents b976074 + 8c91d60 commit 6105a6f
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 190 deletions.
67 changes: 1 addition & 66 deletions server/etcdserver/server.go
Expand Up @@ -511,9 +511,7 @@ func (s *EtcdServer) adjustTicks() {
func (s *EtcdServer) Start() {
s.start()
s.GoAttach(func() { s.adjustTicks() })
// TODO: Switch to publishV3 in 3.6.
// Support for cluster_member_set_attr was added in 3.5.
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
s.GoAttach(s.monitorClusterVersions)
Expand Down Expand Up @@ -1698,69 +1696,6 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
}
}

// publish registers server information into the cluster. The information
// is the JSON representation of this server's member struct, updated with the
// static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
//
// Use v2 store to encode member attributes, and apply through Raft
// but does not go through v2 API endpoint, which means cluster can still
// process publish requests through rafthttp
// TODO: Remove in 3.6 (start using publishV3)
func (s *EtcdServer) publish(timeout time.Duration) {
lg := s.Logger()
b, err := json.Marshal(s.attributes)
if err != nil {
lg.Panic("failed to marshal JSON", zap.Error(err))
return
}
req := pb.Request{
Method: "PUT",
Path: membership.MemberAttributesStorePath(s.id),
Val: string(b),
}

for {
ctx, cancel := context.WithTimeout(s.ctx, timeout)
_, err := s.Do(ctx, req)
cancel()
switch err {
case nil:
close(s.readych)
lg.Info(
"published local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("request-path", req.Path),
zap.String("cluster-id", s.cluster.ID().String()),
zap.Duration("publish-timeout", timeout),
)
return

case ErrStopped:
lg.Warn(
"stopped publish because server is stopped",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
return

default:
lg.Warn(
"failed to publish local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("request-path", req.Path),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
}
}
}

func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
atomic.AddInt64(&s.inflightSnapshots, 1)

Expand Down
124 changes: 0 additions & 124 deletions server/etcdserver/server_test.go
Expand Up @@ -1473,130 +1473,6 @@ func TestUpdateMember(t *testing.T) {

// TODO: test server could stop itself when being removed

func TestPublish(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},

ctx: ctx,
cancel: cancel,
}
srv.publish(time.Hour)

action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
var gattr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
t.Fatalf("unmarshal val error: %v", err)
}
if !reflect.DeepEqual(gattr, wm.Attributes) {
t.Errorf("member = %v, want %v", gattr, wm.Attributes)
}
}

// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
lg := zaptest.NewLogger(t)
ctx, cancel := context.WithCancel(context.Background())
r := newRaftNode(raftNodeConfig{
lg: lg,
Node: newNodeNop(),
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
done: make(chan struct{}),
stopping: make(chan struct{}),
stop: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},

ctx: ctx,
cancel: cancel,
}
close(srv.stopping)
srv.publish(time.Hour)
}

// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
lg := zaptest.NewLogger(t)

ctx, cancel := context.WithCancel(context.Background())
n := newNodeRecorderStream()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
// expect multiple proposals from retrying
ch := make(chan struct{})
go func() {
defer close(ch)
if action, err := n.Wait(2); err != nil {
t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
}
close(srv.stopping)
// drain remaining actions, if any, so publish can terminate
for {
select {
case <-ch:
return
default:
n.Action()
}
}
}()
srv.publish(10 * time.Nanosecond)
ch <- struct{}{}
<-ch
}

func TestPublishV3(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
Expand Down

0 comments on commit 6105a6f

Please sign in to comment.