Skip to content

Commit

Permalink
Handle ip changes (#1446)
Browse files Browse the repository at this point in the history
* consul like approach: add server_lookup to make dkron independent from server node IP on raft layer
* handle memberupdate event
* query other servers before start & add test
* don't remove itself if node is a raft leader
* don't remove dkron server node if id matches
  • Loading branch information
ivan-kripakov-m10 committed Feb 3, 2024
1 parent 2f40b60 commit 1c45fdf
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 17 deletions.
25 changes: 18 additions & 7 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ type Agent struct {

// peers is used to track the known Dkron servers. This is
// used for region forwarding and clustering.
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex
serverLookup *ServerLookup

activeExecutions sync.Map

Expand All @@ -142,8 +143,9 @@ type AgentOption func(agent *Agent)
// and running a Dkron instance.
func NewAgent(config *Config, options ...AgentOption) *Agent {
agent := &Agent{
config: config,
retryJoinCh: make(chan error),
config: config,
retryJoinCh: make(chan error),
serverLookup: NewServerLookup(),
}

for _, option := range options {
Expand Down Expand Up @@ -316,7 +318,13 @@ func (a *Agent) setupRaft() error {
logger = a.logger.Logger.Writer()
}

transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger)
transConfig := &raft.NetworkTransportConfig{
Stream: a.raftLayer,
MaxPool: 3,
Timeout: raftTimeout,
ServerAddressProvider: a.serverLookup,
}
transport := raft.NewNetworkTransportWithConfig(transConfig)
a.raftTransport = transport

config := raft.DefaultConfig()
Expand Down Expand Up @@ -710,7 +718,10 @@ func (a *Agent) eventLoop() {
a.localMemberEvent(me)
case serf.EventMemberReap:
a.localMemberEvent(me)
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
case serf.EventMemberUpdate:
a.lanNodeUpdate(me)
a.localMemberEvent(me)
case serf.EventUser, serf.EventQuery: // Ignore
default:
a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event")
}
Expand Down
2 changes: 1 addition & 1 deletion dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
logLevel = "error"
logLevel = "info"
)

func TestAgentCommand_runForElection(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -284,17 +285,12 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error {
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
return nil
}
future := a.raft.RemoveServer(server.ID, 0, 0)
if server.Address == raft.ServerAddress(addr) {
future := a.raft.RemoveServer(server.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
}
a.logger.WithField("server", server.Address).Info("dkron: removed server with duplicate address")
} else {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
}
a.logger.WithField("server", server.ID).Info("dkron: removed server with duplicate ID")
}
}
}
Expand All @@ -315,6 +311,15 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error {
// removeRaftPeer is used to remove a Raft peer when a dkron server leaves
// or is reaped
func (a *Agent) removeRaftPeer(m serf.Member, parts *ServerParts) error {

// Do not remove ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if strings.EqualFold(m.Name, a.config.NodeName) {
a.logger.Warn("removing self should be done by follower", "name", a.config.NodeName)
return nil
}

// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
Expand Down
63 changes: 60 additions & 3 deletions dkron/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dkron

import (
"strings"
"time"

"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
Expand All @@ -11,6 +12,9 @@ const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)

// maxPeerRetries limits how many invalidate attempts are made
maxPeerRetries = 6
)

// nodeJoin is used to handle join events on the serf cluster
Expand All @@ -21,8 +25,8 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) {
a.logger.WithField("member", m.Name).Warn("non-server in gossip pool")
continue
}
a.logger.WithField("server", parts.Name).Info("adding server")

a.logger.WithField("server", parts.Name).Info("Adding LAN adding server")
a.serverLookup.AddServer(parts)
// Check if this server is known
found := false
a.peerLock.Lock()
Expand Down Expand Up @@ -110,7 +114,46 @@ func (a *Agent) maybeBootstrap() {
return
}

// TODO: Query each of the servers and make sure they report no Raft peers.
// Query each of the servers and make sure they report no Raft peers.
for _, server := range servers {
var peers []string

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String())
if err != nil {
nextRetry := (1 << attempt) * time.Second
a.logger.Error("Failed to confirm peer status for server (will retry).",
"server", server.Name,
"retry_interval", nextRetry.String(),
"error", err,
)
time.Sleep(nextRetry)
} else {
for _, peer := range configuration.Servers {
peers = append(peers, peer.Id)
}
break
}
}

// Found a node with some Raft peers, stop bootstrap since there's
// evidence of an existing cluster. We should get folded in by the
// existing servers if that's the case, so it's cleaner to sit as a
// candidate with no peers so we don't cause spurious elections.
// It's OK this is racy, because even with an initial bootstrap
// as long as one peer runs bootstrap things will work, and if we
// have multiple peers bootstrap in the same way, that's OK. We
// just don't want a server added much later to do a live bootstrap
// and interfere with the cluster. This isn't required for Raft's
// correctness because no server in the existing cluster will vote
// for this server, but it makes things much more stable.
if len(peers) > 0 {
a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name)
a.config.BootstrapExpect = 0
return
}
}

// Update the peer set
// Attempt a live bootstrap!
Expand Down Expand Up @@ -174,6 +217,7 @@ func (a *Agent) nodeFailed(me serf.MemberEvent) {
delete(a.localPeers, raft.ServerAddress(parts.Addr.String()))
}
a.peerLock.Unlock()
a.serverLookup.RemoveServer(parts)
}
}

Expand All @@ -200,3 +244,16 @@ func (a *Agent) localMemberEvent(me serf.MemberEvent) {
}
}
}

func (a *Agent) lanNodeUpdate(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isServer(m)
if !ok {
continue
}
a.logger.WithField("server", parts.String()).Info("Updating LAN server")

// Update server lookup
a.serverLookup.AddServer(parts)
}
}
76 changes: 76 additions & 0 deletions dkron/server_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package dkron

import (
"fmt"
"sync"

"github.com/hashicorp/raft"
)

// ServerLookup encapsulates looking up servers by id and address
type ServerLookup struct {
lock sync.RWMutex
addressToServer map[raft.ServerAddress]*ServerParts
idToServer map[raft.ServerID]*ServerParts
}

func NewServerLookup() *ServerLookup {
return &ServerLookup{
lock: sync.RWMutex{},
addressToServer: make(map[raft.ServerAddress]*ServerParts),
idToServer: make(map[raft.ServerID]*ServerParts),
}
}

func (sl *ServerLookup) AddServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server
sl.idToServer[raft.ServerID(server.ID)] = server
}

func (sl *ServerLookup) RemoveServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
delete(sl.addressToServer, raft.ServerAddress(server.RPCAddr.String()))
delete(sl.idToServer, raft.ServerID(server.ID))
}

// Implements the ServerAddressProvider interface
func (sl *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
sl.lock.RLock()
defer sl.lock.RUnlock()
svr, ok := sl.idToServer[id]
if !ok {
return "", fmt.Errorf("Could not find address for server id %v", id)
}
return raft.ServerAddress(svr.RPCAddr.String()), nil
}

// Server looks up the server by address, returns a boolean if not found
func (sl *ServerLookup) Server(addr raft.ServerAddress) *ServerParts {
sl.lock.RLock()
defer sl.lock.RUnlock()
return sl.addressToServer[addr]
}

func (sl *ServerLookup) Servers() []*ServerParts {
sl.lock.RLock()
defer sl.lock.RUnlock()
var ret []*ServerParts
for _, svr := range sl.addressToServer {
ret = append(ret, svr)
}
return ret
}

func (sl *ServerLookup) CheckServers(fn func(srv *ServerParts) bool) {
sl.lock.RLock()
defer sl.lock.RUnlock()

for _, srv := range sl.addressToServer {
if !fn(srv) {
return
}
}
}
91 changes: 91 additions & 0 deletions dkron/server_lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package dkron

import (
"testing"

"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
)

type testAddr struct {
addr string
}

func (ta *testAddr) Network() string {
return "tcp"
}

func (ta *testAddr) String() string {
return ta.addr
}

func TestAddServer(t *testing.T) {
// arrange
lookup := NewServerLookup()
id1, addr1 := "server-1", "127.0.0.1:8300"
id2, addr2 := "server-2", "127.0.0.2:8300"
server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2)

// act
lookup.AddServer(server1)
lookup.AddServer(server2)

// assert
servers := lookup.Servers()
expectedServers := []*ServerParts{server1, server2}
require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers)

got, err := lookup.ServerAddr(raft.ServerID(id1))
require.NoErrorf(t, err, "Unexpected error: %v", err)
require.EqualValuesf(t, addr1, string(got), "Expected %v but got %v", addr1, got)

server := lookup.Server(raft.ServerAddress(addr1))
strAddr := server.RPCAddr.String()
require.EqualValuesf(t, addr1, strAddr, "Expected lookup to return address %v but got %v", addr1, strAddr)

got, err = lookup.ServerAddr(raft.ServerID(id2))
require.NoErrorf(t, err, "Unexpected error: %v", err)
require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got)

server = lookup.Server(raft.ServerAddress(addr2))
strAddr = server.RPCAddr.String()
require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr)
}

func TestRemoveServer(t *testing.T) {
// arrange
lookup := NewServerLookup()
id1, addr1 := "server-1", "127.0.0.1:8300"
id2, addr2 := "server-2", "127.0.0.2:8300"
server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2)
lookup.AddServer(server1)
lookup.AddServer(server2)

// act
lookup.RemoveServer(server1)

// assert
servers := lookup.Servers()
expectedServers := []*ServerParts{server2}
require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers)

require.Nilf(t, lookup.Server(raft.ServerAddress(addr1)), "Expected lookup to return nil")
addr, err := lookup.ServerAddr(raft.ServerID(id1))
require.Errorf(t, err, "Expected lookup to return error")
require.EqualValuesf(t, "", string(addr), "Expected empty address but got %v", addr)

got, err := lookup.ServerAddr(raft.ServerID(id2))
require.NoErrorf(t, err, "Unexpected error: %v", err)
require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got)

server := lookup.Server(raft.ServerAddress(addr2))
strAddr := server.RPCAddr.String()
require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr)
}

func buildServerParts(id, addr string) *ServerParts {
return &ServerParts{
ID: id,
RPCAddr: &testAddr{addr},
}
}

0 comments on commit 1c45fdf

Please sign in to comment.