Skip to content

Commit

Permalink
Merge pull request #614 from hashicorp/feature/per-node-reconnect-tim…
Browse files Browse the repository at this point in the history
…eout
  • Loading branch information
mkeeler committed Oct 7, 2020
2 parents 9309b94 + 943cc43 commit 7faa1b0
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 30 deletions.
4 changes: 2 additions & 2 deletions coordinate/performance_test.go
Expand Up @@ -75,7 +75,7 @@ func TestPerformance_Height(t *testing.T) {

// Make sure the height looks reasonable with the regular nodes all in a
// plane, and the center node up above.
for i, _ := range clients {
for i := range clients {
coord := clients[i].GetCoordinate()
if i == 0 {
if coord.Height < 0.97*radius.Seconds() {
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestPerformance_Drift(t *testing.T) {
}

mid := make([]float64, config.Dimensionality)
for i, _ := range mid {
for i := range mid {
mid[i] = min.Vec[i] + (max.Vec[i]-min.Vec[i])/2
}
return magnitude(mid)
Expand Down
4 changes: 2 additions & 2 deletions coordinate/phantom.go
Expand Up @@ -11,7 +11,7 @@ import (
// given config.
func GenerateClients(nodes int, config *Config) ([]*Client, error) {
clients := make([]*Client, nodes)
for i, _ := range clients {
for i := range clients {
client, err := NewClient(config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -146,7 +146,7 @@ func Simulate(clients []*Client, truth [][]time.Duration, cycles int) {

nodes := len(clients)
for cycle := 0; cycle < cycles; cycle++ {
for i, _ := range clients {
for i := range clients {
if j := rand.Intn(nodes); j != i {
c := clients[j].GetCoordinate()
rtt := truth[i][j]
Expand Down
2 changes: 1 addition & 1 deletion coordinate/util_test.go
Expand Up @@ -21,7 +21,7 @@ func verifyEqualVectors(t *testing.T, vec1 []float64, vec2 []float64) {
t.Fatalf("vector length mismatch, %d != %d", len(vec1), len(vec2))
}

for i, _ := range vec1 {
for i := range vec1 {
verifyEqualFloats(t, vec1[i], vec2[i])
}
}
4 changes: 4 additions & 0 deletions serf/config.go
Expand Up @@ -254,6 +254,10 @@ type Config struct {
// WARNING: this should ONLY be used in tests
messageDropper func(typ messageType) bool

// ReconnectTimeoutOverride is an optional interface which when present allows
// the application to cause reaping of a node to happen when it otherwise wouldn't
ReconnectTimeoutOverride ReconnectTimeoutOverrider

// ValidateNodeNames controls whether nodenames only
// contain alphanumeric, dashes and '.'characters
// and sets maximum length to 128 characters
Expand Down
21 changes: 3 additions & 18 deletions serf/merge_delegate.go
Expand Up @@ -3,7 +3,6 @@ package serf
import (
"fmt"
"net"
"strconv"

"github.com/hashicorp/memberlist"
)
Expand Down Expand Up @@ -61,26 +60,12 @@ func (m *mergeDelegate) nodeToMember(n *memberlist.Node) (*Member, error) {

// validateMemberInfo checks that the data we are sending is valid
func (m *mergeDelegate) validateMemberInfo(n *memberlist.Node) error {
if err := m.serf.ValidateNodeNames(); err != nil {
if err := m.serf.validateNodeName(n.Name); err != nil {
return err
}

host, port, err := net.SplitHostPort(string(n.Addr))
if err != nil {
return err
}

ip := net.ParseIP(host)
if ip == nil || (ip.To4() == nil && ip.To16() == nil) {
return fmt.Errorf("%v is not a valid IPv4 or IPv6 address\n", ip)
}

p, err := strconv.Atoi(port)
if err != nil {
return err
}
if p < 0 || p > 65535 {
return fmt.Errorf("invalid port %v , port must be a valid number from 0-65535", p)
if len(n.Addr) != 4 && len(n.Addr) != 16 {
return fmt.Errorf("IP byte length is invalid: %d bytes is not either 4 or 16", len(n.Addr))
}

if len(n.Meta) > memberlist.MetaMaxSize {
Expand Down
102 changes: 102 additions & 0 deletions serf/merge_delegate_test.go
@@ -0,0 +1,102 @@
package serf

import (
"net"
"strings"
"testing"

"github.com/hashicorp/memberlist"
)

func TestValidateMemberInfo(t *testing.T) {
type testCase struct {
name string
addr net.IP
meta []byte
validateNodeNames bool
err string
}

cases := map[string]testCase{
"invalid-name-chars": {
name: "space not allowed",
addr: []byte{1, 2, 3, 4},
validateNodeNames: true,
err: "Node name contains invalid characters",
},
"invalid-name-chars-not-validated": {
name: "space not allowed",
addr: []byte{1, 2, 3, 4},
validateNodeNames: false,
},
"invalid-name-len": {
name: strings.Repeat("abcd", 33),
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
validateNodeNames: true,
err: "Node name is 132 characters.", // 33 * 4
},
"invalid-name-len-not-validated": {
name: strings.Repeat("abcd", 33),
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
validateNodeNames: false,
},
"invalid-ip": {
name: "test",
addr: []byte{1, 2}, // length has to be 4 or 16
err: "IP byte length is invalid",
},
"invalid-ip-2": {
name: "test",
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}, // length has to be 4 or 16
err: "IP byte length is invalid",
},
"meta-too-long": {
name: "test",
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
meta: []byte(strings.Repeat("a", 513)),
err: "Encoded length of tags exceeds limit",
},
"ipv4-okay": {
name: "test",
addr: []byte{1, 1, 1, 1},
},
"ipv6-okay": {
name: "test",
addr: []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
},
}

for name, tcase := range cases {
t.Run(name, func(t *testing.T) {

delegate := mergeDelegate{
serf: &Serf{
config: &Config{
ValidateNodeNames: tcase.validateNodeNames,
},
},
}

node := &memberlist.Node{
Name: tcase.name,
Addr: tcase.addr,
Meta: tcase.meta,
}

err := delegate.validateMemberInfo(node)

if tcase.err == "" {
if err != nil {
t.Fatalf("Encountered an unexpected error when validating member info: %v", err)
}
} else {
if err == nil {
t.Fatalf("Did not encounter the expected error of %q", tcase.err)
}
if !strings.Contains(err.Error(), tcase.err) {
t.Fatalf("Member info validation failed with a different error than expected. Expected: %q, Actual: %q", tcase.err, err.Error())
}
}
})
}
}
29 changes: 22 additions & 7 deletions serf/serf.go
Expand Up @@ -50,6 +50,12 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// ReconnectTimeoutOverrider is an interface that can be implemented to allow overriding
// the reconnect timeout for individual members.
type ReconnectTimeoutOverrider interface {
ReconnectTimeout(member *Member, timeout time.Duration) time.Duration
}

// Serf is a single node that is part of a single cluster that gets
// events about joins/leaves/failures/etc. It is created with the Create
// method.
Expand Down Expand Up @@ -1577,8 +1583,13 @@ func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []
for i := 0; i < n; i++ {
m := old[i]

memberTimeout := timeout
if s.config.ReconnectTimeoutOverride != nil {
memberTimeout = s.config.ReconnectTimeoutOverride.ReconnectTimeout(&m.Member, memberTimeout)
}

// Skip if the timeout is not yet reached
if now.Sub(m.leaveTime) <= timeout {
if now.Sub(m.leaveTime) <= memberTimeout {
continue
}

Expand Down Expand Up @@ -1894,15 +1905,19 @@ func (s *Serf) NumNodes() (numNodes int) {
// ValidateNodeNames verifies the NodeName contains
// only alphanumeric, -, or . and is under 128 chracters
func (s *Serf) ValidateNodeNames() error {
return s.validateNodeName(s.config.NodeName)
}

func (s *Serf) validateNodeName(name string) error {
if s.config.ValidateNodeNames {
var InvalidNameRe = regexp.MustCompile(`[^A-Za-z0-9\-\.]+`)
if InvalidNameRe.MatchString(s.config.NodeName) {
return fmt.Errorf("NodeName contains invalid characters %v , Valid characters include "+
"all alpha-numerics and dashes and '.' ", s.config.NodeName)
if InvalidNameRe.MatchString(name) {
return fmt.Errorf("Node name contains invalid characters %v , Valid characters include "+
"all alpha-numerics and dashes and '.' ", name)
}
if len(s.config.NodeName) > MaxNodeNameLength {
return fmt.Errorf("NodeName is %v characters. "+
"Valid length is between 1 and 128 characters", len(s.config.NodeName))
if len(name) > MaxNodeNameLength {
return fmt.Errorf("Node name is %v characters. "+
"Valid length is between 1 and 128 characters", len(name))
}
}
return nil
Expand Down
65 changes: 65 additions & 0 deletions serf/serf_test.go
Expand Up @@ -2999,3 +2999,68 @@ func TestSerf_ValidateNodeName(t *testing.T) {
}

}

type reconnectOverride struct {
timeout time.Duration
called bool
}

func (r *reconnectOverride) ReconnectTimeout(_ *Member, _ time.Duration) time.Duration {
r.called = true
return r.timeout
}

func TestSerf_perNodeReconnectTimeout(t *testing.T) {
ip1, returnFn1 := testutil.TakeIP()
defer returnFn1()

ip2, returnFn2 := testutil.TakeIP()
defer returnFn2()

override := reconnectOverride{timeout: 1 * time.Microsecond}

// Create the s1 config with an event channel so we can listen
eventCh := make(chan Event, 4)
s1Config := testConfig(t, ip1)
s1Config.ReconnectTimeout = 30 * time.Second
s1Config.ReconnectTimeoutOverride = &override
s1Config.EventCh = eventCh

s2Config := testConfig(t, ip2)

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s2.Shutdown()

waitUntilNumNodes(t, 1, s1, s2)

_, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false)
if err != nil {
t.Fatalf("err: %v", err)
}

waitUntilNumNodes(t, 2, s1, s2)

err = s2.Shutdown()
if err != nil {
t.Fatalf("err: %v", err)
}

waitUntilNumNodes(t, 1, s1)

// Since s2 shutdown, we check the events to make sure we got failures.
testEvents(t, eventCh, s2Config.NodeName,
[]EventType{EventMemberJoin, EventMemberFailed, EventMemberReap})

if !override.called {
t.Fatalf("The reconnect override was not used")
}
}

0 comments on commit 7faa1b0

Please sign in to comment.