Skip to content

Commit

Permalink
Update ForceLeave Prune (#580)
Browse files Browse the repository at this point in the history
* Prune flag on ForceLeave added. Refactor of `reap` function to call to `eraseNode`.
  • Loading branch information
schristoff committed Oct 2, 2019
1 parent d014479 commit 1d3fdf9
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 22 deletions.
32 changes: 24 additions & 8 deletions serf/serf.go
Expand Up @@ -1102,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
case StatusAlive:
member.Status = StatusLeaving
member.statusLTime = leaveMsg.LTime

if leaveMsg.Prune {
s.handlePrune(member)
}
return true
case StatusFailed:
member.Status = StatusLeft
Expand All @@ -1127,26 +1131,38 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
}

if leaveMsg.Prune {
s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
s.eraseNode(member)
s.handlePrune(member)
}

return true

case StatusLeft:
case StatusLeaving, StatusLeft:
if leaveMsg.Prune {
s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
s.eraseNode(member)
s.handlePrune(member)
}
return true

default:
return false
}
}

// handlePrune waits for nodes that are leaving and then forcibly
// erases a member from the list of members
func (s *Serf) handlePrune(member *memberState) {
if member.Status == StatusLeaving {
time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay)
}

s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)

//If we are leaving or left we may be in that list of members
if member.Status == StatusLeaving || member.Status == StatusLeft {
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
}
s.eraseNode(member)

}

// handleNodeJoinIntent is called when a node broadcasts a
// join message to set the lamport time of its join
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
Expand Down
226 changes: 212 additions & 14 deletions serf/serf_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/testutil"
"github.com/hashicorp/serf/testutil/retry"
)

func testConfig() *Config {
Expand Down Expand Up @@ -65,6 +66,27 @@ func testMember(t *testing.T, members []Member, name string, status MemberStatus
panic(fmt.Sprintf("node not found: %s", name))
}

// testMemberStatus is testMember but returns an error
// instead of failing the test
func testMemberStatus(members []Member, name string, status MemberStatus) error {
for _, m := range members {
if m.Name == name {
if m.Status != status {
return fmt.Errorf("bad state for %s: %d", name, m.Status)
}
return nil
}
}

if status == StatusNone {
// We didn't expect to find it
return nil
}

return fmt.Errorf("node not found: %s", name)

}

func TestCreate_badProtocolVersion(t *testing.T) {
cases := []struct {
version uint8
Expand Down Expand Up @@ -500,7 +522,7 @@ func TestSerf_leaveRejoinDifferentRole(t *testing.T) {
t.Fatalf("s1 members: %d", len(s1.Members()))
}

var member *Member = nil
var member *Member
for _, m := range members {
if m.Name == s3Config.NodeName {
member = &m
Expand All @@ -517,6 +539,182 @@ func TestSerf_leaveRejoinDifferentRole(t *testing.T) {
}
}

func TestSerf_forceLeaveFailed(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()
s3Config := testConfig()

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}

defer s2.Shutdown()

s3, err := Create(s3Config)
if err != nil {
t.Fatalf("err: %s", err)
}

defer s3.Shutdown()

_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, false)
if err != nil {
t.Fatalf("err: %s", err)
}

_, err = s1.Join([]string{s3Config.MemberlistConfig.BindAddr}, false)
if err != nil {
t.Fatalf("err: %s", err)
}

//Put s2 in failed state
s2.Shutdown()

retry.Run(t, func(r *retry.R) {
if err := testMemberStatus(s1.Members(), s2Config.NodeName, StatusFailed); err != nil {
r.Fatal(err)
}
})
s1.forceLeave(s2.config.NodeName, true)

memberlen := len(s1.Members())
if memberlen != 2 {
t.Fatalf("wanted 2 alive members, got %v", s1.Members())
}

}

func TestSerf_forceLeaveLeaving(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()
s3Config := testConfig()

//make it so it doesn't get reaped
// allow for us to see the leaving state
s1Config.TombstoneTimeout = 1 * time.Hour
s1Config.LeavePropagateDelay = 5 * time.Second

s2Config.TombstoneTimeout = 1 * time.Hour
s2Config.LeavePropagateDelay = 5 * time.Second

s3Config.TombstoneTimeout = 1 * time.Hour
s3Config.LeavePropagateDelay = 5 * time.Second

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}

defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s2.Shutdown()

s3, err := Create(s3Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s3.Shutdown()

_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
testutil.Yield()

_, err = s1.Join([]string{s3Config.MemberlistConfig.BindAddr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
testutil.Yield()

//Put s2 in left state
if err := s2.Leave(); err != nil {
t.Fatal(err)
}

retry.Run(t, func(r *retry.R) {
if err := testMemberStatus(s1.Members(), s2Config.NodeName, 3); err != nil {
r.Fatal(err)
}
})
s1.forceLeave(s2.config.NodeName, true)

memberlen := len(s1.Members())
if memberlen != 2 {
t.Fatalf("wanted 2 alive members, got %v", s1.Members())
}
}

func TestSerf_forceLeaveLeft(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()
s3Config := testConfig()

//make it so it doesn't get reaped
s1Config.TombstoneTimeout = 1 * time.Hour
s2Config.TombstoneTimeout = 1 * time.Hour
s3Config.TombstoneTimeout = 1 * time.Hour

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s2.Shutdown()

s3, err := Create(s3Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s3.Shutdown()

_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
testutil.Yield()

_, err = s1.Join([]string{s3Config.MemberlistConfig.BindAddr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
testutil.Yield()

//Put s2 in left state
if err := s2.Leave(); err != nil {
t.Fatal(err)
}

retry.Run(t, func(r *retry.R) {
if err := testMemberStatus(s1.Members(), s2Config.NodeName, StatusLeft); err != nil {
r.Fatal(err)
}
})
s1.forceLeave(s2.config.NodeName, true)

memberlen := len(s1.Members())
if memberlen != 2 {
t.Fatalf("wanted 2 alive members, got %v", s1.Members())
}

}

func TestSerf_reconnect(t *testing.T) {
eventCh := make(chan Event, 64)
s1Config := testConfig()
Expand Down Expand Up @@ -672,7 +870,7 @@ func TestSerf_update(t *testing.T) {

// Add a tag to force an update event, and add a version downgrade as
// well (that alone won't trigger an update).
s2Config.ProtocolVersion -= 1
s2Config.ProtocolVersion--
s2Config.Tags["foo"] = "bar"

// We try for a little while to wait for s2 to fully shutdown since the
Expand Down Expand Up @@ -1475,31 +1673,31 @@ func TestSerf_SetTags(t *testing.T) {

// Verify the new tags
m1m := s1.Members()
m1m_tags := make(map[string]map[string]string)
m1mTags := make(map[string]map[string]string)
for _, m := range m1m {
m1m_tags[m.Name] = m.Tags
m1mTags[m.Name] = m.Tags
}

if m := m1m_tags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1m_tags)
if m := m1mTags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1mTags)
}

if m := m1m_tags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1m_tags)
if m := m1mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1mTags)
}

m2m := s2.Members()
m2m_tags := make(map[string]map[string]string)
m2mTags := make(map[string]map[string]string)
for _, m := range m2m {
m2m_tags[m.Name] = m.Tags
m2mTags[m.Name] = m.Tags
}

if m := m2m_tags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1m_tags)
if m := m2mTags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1mTags)
}

if m := m2m_tags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1m_tags)
if m := m2mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1mTags)
}
}

Expand Down

0 comments on commit 1d3fdf9

Please sign in to comment.