From e58bcf410672611b082aecc0e2e0c93438d4560c Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sat, 4 Jan 2014 16:18:27 -0600 Subject: [PATCH 01/14] discoverd/client: set.Wait() now just returns a channel. set.Watch() lets you specify one-off watch option, client.Services() now takes an explicit timeout --- client.go | 41 ++++++++++++++++++----------------------- client_test.go | 8 ++++---- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/client.go b/client.go index 3a172f421c..ad21215afc 100644 --- a/client.go +++ b/client.go @@ -12,8 +12,6 @@ import ( "github.com/flynn/rpcplus" ) -var WaitTimeoutSecs = 10 - type Service struct { Created uint Name string @@ -26,7 +24,7 @@ type Service struct { type ServiceSet struct { services map[string]*Service filters map[string]string - watches map[chan *agent.ServiceUpdate]struct{} + watches map[chan *agent.ServiceUpdate]bool serMutex sync.Mutex filMutex sync.Mutex watchMutex sync.Mutex @@ -46,7 +44,7 @@ func makeServiceSet(call *rpcplus.Call) *ServiceSet { return &ServiceSet{ services: make(map[string]*Service), filters: make(map[string]string), - watches: make(map[chan *agent.ServiceUpdate]struct{}), + watches: make(map[chan *agent.ServiceUpdate]bool), call: call, } } @@ -99,8 +97,11 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { s.watchMutex.Lock() defer s.watchMutex.Unlock() - for ch := range s.watches { + for ch, once := range s.watches { ch <- update + if once { + delete(s.watches, ch) + } } } @@ -185,10 +186,10 @@ func (s *ServiceSet) Filter(attrs map[string]string) { } } -func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool) { +func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fireOnce bool) { s.watchMutex.Lock() defer s.watchMutex.Unlock() - s.watches[ch] = struct{}{} + s.watches[ch] = fireOnce if bringCurrent { go func() { s.serMutex.Lock() @@ -212,16 +213,10 @@ func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { delete(s.watches, ch) } -func (s *ServiceSet) Wait() (*agent.ServiceUpdate, error) { +func (s *ServiceSet) Wait() chan *agent.ServiceUpdate { updateCh := make(chan *agent.ServiceUpdate, 1024) // buffer because of Watch bringCurrent race bug - s.Watch(updateCh, true) - defer s.Unwatch(updateCh) - select { - case update := <-updateCh: - return update, nil - case <-time.After(time.Duration(WaitTimeoutSecs) * time.Second): - return nil, errors.New("discover: wait timeout exceeded") - } + s.Watch(updateCh, true, true) + return updateCh } func (s *ServiceSet) Close() error { @@ -260,18 +255,18 @@ func (c *Client) ServiceSet(name string) (*ServiceSet, error) { return set, nil } -func (c *Client) Services(name string) ([]*Service, error) { +func (c *Client) Services(name string, timeout int) ([]*Service, error) { set, err := c.ServiceSet(name) if err != nil { return nil, err } - _, err = set.Wait() - if err != nil { - return nil, err + defer set.Close() + select { + case <-set.Wait(): + return set.Services(), nil + case <-time.After(time.Duration(timeout) * time.Second): + return nil, errors.New("discover: wait timeout exceeded") } - set.Close() - return set.Services(), nil - } func (c *Client) Register(name, addr string) error { diff --git a/client_test.go b/client_test.go index 450d935ac9..f4c728a5b2 100644 --- a/client_test.go +++ b/client_test.go @@ -167,7 +167,7 @@ func TestClient(t *testing.T) { // Test client.Services - services, err := client.Services(serviceName) + services, err := client.Services(serviceName, 1) if err != nil { t.Fatal("Unable to get services:", err) } @@ -179,7 +179,7 @@ func TestClient(t *testing.T) { set, _ = client.ServiceSet(serviceName) updates := make(chan *agent.ServiceUpdate) - set.Watch(updates, true) + set.Watch(updates, true, false) err = client.Register(serviceName, ":5555") if err != nil { t.Fatal("Registering service failed", err) @@ -234,7 +234,7 @@ func TestServiceAgeAndLeader(t *testing.T) { if err != nil { t.Fatal("Registering service failed", err.Error()) } - services, _ := client.Services(serviceName) + services, _ := client.Services(serviceName, 1) if len(services) < 1 { t.Fatal("Registered service not online") } @@ -246,7 +246,7 @@ func TestServiceAgeAndLeader(t *testing.T) { if err != nil { t.Fatal("Registering service failed", err.Error()) } - services, _ = client.Services(serviceName) + services, _ = client.Services(serviceName, 1) if len(services) < 2 { t.Fatal("Registered services not online") } From a16c4d057d4a0623d7c8e9a0acc52b363d75954e Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sat, 4 Jan 2014 17:55:54 -0600 Subject: [PATCH 02/14] discoverd/client: simplified mutexes. added initial RegisterWithSet, which required addition of Ignore method. refactored tests to be separate tests, show discoverd output if DEBUG env var is set, cleanup up error handling with an assert function, and a simpler setup/cleanup system --- client.go | 106 ++++++++++++++---------- client_test.go | 213 +++++++++++++++++++++++++++++-------------------- 2 files changed, 191 insertions(+), 128 deletions(-) diff --git a/client.go b/client.go index ad21215afc..de9168d2c0 100644 --- a/client.go +++ b/client.go @@ -22,13 +22,12 @@ type Service struct { } type ServiceSet struct { - services map[string]*Service - filters map[string]string - watches map[chan *agent.ServiceUpdate]bool - serMutex sync.Mutex - filMutex sync.Mutex - watchMutex sync.Mutex - call *rpcplus.Call + sync.Mutex + services map[string]*Service + filters map[string]string + watches map[chan *agent.ServiceUpdate]bool + ignores map[string]struct{} + call *rpcplus.Call } func copyService(service *Service) *Service { @@ -45,6 +44,7 @@ func makeServiceSet(call *rpcplus.Call) *ServiceSet { services: make(map[string]*Service), filters: make(map[string]string), watches: make(map[chan *agent.ServiceUpdate]bool), + ignores: make(map[string]struct{}), call: call, } } @@ -61,12 +61,12 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { isCurrent = true continue } + s.Lock() if s.filters != nil && !s.matchFilters(update.Attrs) { + s.Unlock() continue } - - s.serMutex.Lock() - if update.Online { + if _, ignore := s.ignores[update.Addr]; !ignore && update.Online { if _, exists := s.services[update.Addr]; !exists { host, port, _ := net.SplitHostPort(update.Addr) s.services[update.Addr] = &Service{ @@ -82,11 +82,11 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { if _, exists := s.services[update.Addr]; exists { delete(s.services, update.Addr) } else { - s.serMutex.Unlock() + s.Unlock() continue } } - s.serMutex.Unlock() + s.Unlock() s.updateWatches(update) } s.closeWatches() @@ -95,8 +95,8 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { } func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() + s.Lock() + defer s.Unlock() for ch, once := range s.watches { ch <- update if once { @@ -106,16 +106,14 @@ func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { } func (s *ServiceSet) closeWatches() { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() + s.Lock() + defer s.Unlock() for ch := range s.watches { close(ch) } } func (s *ServiceSet) matchFilters(attrs map[string]string) bool { - s.filMutex.Lock() - defer s.filMutex.Unlock() for key, value := range s.filters { if attrs[key] != value { return false @@ -140,8 +138,8 @@ func (s *ServiceSet) Leader() *Service { } func (s *ServiceSet) Services() []*Service { - s.serMutex.Lock() - defer s.serMutex.Unlock() + s.Lock() + defer s.Unlock() list := make([]*Service, 0, len(s.services)) for _, service := range s.services { list = append(list, copyService(service)) @@ -158,8 +156,8 @@ func (s *ServiceSet) Addrs() []string { } func (s *ServiceSet) Select(attrs map[string]string) []*Service { - s.serMutex.Lock() - defer s.serMutex.Unlock() + s.Lock() + defer s.Unlock() list := make([]*Service, 0, len(s.services)) outer: for _, service := range s.services { @@ -174,11 +172,9 @@ outer: } func (s *ServiceSet) Filter(attrs map[string]string) { - s.filMutex.Lock() + s.Lock() + defer s.Unlock() s.filters = attrs - s.filMutex.Unlock() - s.serMutex.Lock() - defer s.serMutex.Unlock() for key, service := range s.services { if !s.matchFilters(service.Attrs) { delete(s.services, key) @@ -186,14 +182,25 @@ func (s *ServiceSet) Filter(attrs map[string]string) { } } +func (s *ServiceSet) Ignore(addr string) { + s.Lock() + defer s.Unlock() + s.ignores[addr] = struct{}{} + for key, service := range s.services { + if service.Addr == addr { + delete(s.services, key) + } + } +} + func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fireOnce bool) { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() + s.Lock() s.watches[ch] = fireOnce + s.Unlock() if bringCurrent { go func() { - s.serMutex.Lock() - defer s.serMutex.Unlock() + s.Lock() + defer s.Unlock() for _, service := range s.services { ch <- &agent.ServiceUpdate{ Name: service.Name, @@ -208,8 +215,8 @@ func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fire } func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() + s.Lock() + defer s.Unlock() delete(s.watches, ch) } @@ -224,9 +231,10 @@ func (s *ServiceSet) Close() error { } type Client struct { - client *rpcplus.Client - heartbeats map[string]chan struct{} - hbMutex sync.Mutex + sync.Mutex + client *rpcplus.Client + heartbeats map[string]chan struct{} + expandedAddrs map[string]string } func NewClient() (*Client, error) { @@ -240,8 +248,9 @@ func NewClient() (*Client, error) { func NewClientUsingAddress(addr string) (*Client, error) { client, err := rpcplus.DialHTTP("tcp", addr) return &Client{ - client: client, - heartbeats: make(map[string]chan struct{}), + client: client, + heartbeats: make(map[string]chan struct{}), + expandedAddrs: make(map[string]string), }, err } @@ -273,6 +282,20 @@ func (c *Client) Register(name, addr string) error { return c.RegisterWithAttributes(name, addr, nil) } +func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string) (*ServiceSet, error) { + err := c.RegisterWithAttributes(name, addr, attributes) + if err != nil { + return nil, err + } + set, err := c.ServiceSet(name) + if err != nil { + c.Unregister(name, addr) + return nil, err + } + set.Ignore(c.expandedAddrs[addr]) + return set, nil +} + func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string]string) error { args := &agent.Args{ Name: name, @@ -285,9 +308,10 @@ func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string return errors.New("discover: register failed: " + err.Error()) } done := make(chan struct{}) - c.hbMutex.Lock() + c.Lock() c.heartbeats[args.Addr] = done - c.hbMutex.Unlock() + c.expandedAddrs[args.Addr] = ret + c.Unlock() go func() { ticker := time.NewTicker(agent.HeartbeatIntervalSecs * time.Second) // TODO: add jitter defer ticker.Stop() @@ -312,10 +336,10 @@ func (c *Client) Unregister(name, addr string) error { Name: name, Addr: addr, } - c.hbMutex.Lock() + c.Lock() close(c.heartbeats[args.Addr]) delete(c.heartbeats, args.Addr) - c.hbMutex.Unlock() + c.Unlock() err := c.client.Call("Agent.Unregister", args, &struct{}{}) if err != nil { return errors.New("discover: unregister failed: " + err.Error()) diff --git a/client_test.go b/client_test.go index f4c728a5b2..79bead1b25 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,8 @@ package discoverd import ( + "bufio" + "log" "math/rand" "os" "os/exec" @@ -51,9 +53,18 @@ func runDiscoverdServer() func() { doneCh := make(chan struct{}) go func() { cmd := exec.Command("discoverd") + stderr, _ := cmd.StderrPipe() if err := cmd.Start(); err != nil { panic(err) } + if os.Getenv("DEBUG") != "" { + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + log.Println("discoverd:", scanner.Text()) + } + }() + } cmdDone := make(chan error) go func() { cmdDone <- cmd.Wait() @@ -76,115 +87,156 @@ func runDiscoverdServer() func() { } } -func TestClient(t *testing.T) { +func setup(t *testing.T) (*Client, func()) { killEtcd := runEtcdServer() - defer killEtcd() killDiscoverd := runDiscoverdServer() - defer killDiscoverd() - client, err := NewClient() if err != nil { t.Fatal(err) } - serviceName := "testService" - - // Test Register and ServiceSet with attributes - - err = client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) + return client, func() { + killDiscoverd() + killEtcd() } - err = client.Register(serviceName, ":2222") +} + +func assert(err error, t *testing.T) error { if err != nil { - t.Fatal("Registering service failed", err.Error()) + t.Fatal("Unexpected error:", err.Error()) } - set, _ := client.ServiceSet(serviceName) + return err +} + +func TestBasicRegisterAndServiceSet(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "basicTest" + + assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) + assert(client.Register(serviceName, ":2222"), t) + + set, err := client.ServiceSet(serviceName) + assert(err, t) + if len(set.Services()) < 2 { t.Fatal("Registered services not online") } - err = client.Unregister(serviceName, ":2222") - if err != nil { - t.Fatal("Unregistering service failed", err.Error()) - } + assert(client.Unregister(serviceName, ":2222"), t) if len(set.Services()) != 1 { t.Fatal("Only 1 registered service should be left") } + if set.Services()[0].Attrs["foo"] != "bar" { t.Fatal("Attribute not set on service as 'bar'") } - // Test Re-register + assert(set.Close(), t) +} - err = client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "baz"}) - if err != nil { - t.Fatal("Re-registering service failed", err.Error()) - } +func TestNewAttributes(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "attributeTest" + + set, err := client.ServiceSet(serviceName) + assert(err, t) + + assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) + assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "baz"}), t) + + <-set.Wait() if set.Services()[0].Attrs["foo"] != "baz" { t.Fatal("Attribute not set on re-registered service as 'baz'") } - err = client.RegisterWithAttributes(serviceName, ":2222", map[string]string{"foo": "qux", "id": "2"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } + assert(set.Close(), t) +} + +func TestFiltering(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() - // Test Filter + serviceName := "filterTest" + + set, err := client.ServiceSet(serviceName) + assert(err, t) + + assert(client.Register(serviceName, ":1111"), t) + assert(client.RegisterWithAttributes(serviceName, ":2222", map[string]string{"foo": "qux", "id": "2"}), t) set.Filter(map[string]string{"foo": "qux"}) if len(set.Services()) > 1 { t.Fatal("Filter not limiting online services in set") } - err = client.RegisterWithAttributes(serviceName, ":3333", map[string]string{"foo": "qux", "id": "3"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } + assert(client.RegisterWithAttributes(serviceName, ":3333", map[string]string{"foo": "qux", "id": "3"}), t) if len(set.Services()) < 2 { t.Fatal("Filter not letting new matching services in set") } - err = client.RegisterWithAttributes(serviceName, ":4444", map[string]string{"foo": "baz"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } + assert(client.RegisterWithAttributes(serviceName, ":4444", map[string]string{"foo": "baz"}), t) if len(set.Services()) > 2 { t.Fatal("Filter not limiting new unmatching services from set") } - // Test Select + assert(set.Close(), t) +} + +func TestSelecting(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "selectTest" + + set, err := client.ServiceSet(serviceName) + assert(err, t) + + assert(client.Register(serviceName, ":1111"), t) + assert(client.RegisterWithAttributes(serviceName, ":2222", map[string]string{"foo": "qux", "id": "2"}), t) + assert(client.RegisterWithAttributes(serviceName, ":3333", map[string]string{"foo": "qux", "id": "3"}), t) if len(set.Select(map[string]string{"id": "3"})) != 1 { t.Fatal("Select not returning proper services") } - // Test Close + assert(set.Close(), t) +} - err = set.Close() - if err != nil { - t.Fatal("Unable to close:", err) - } +func TestServices(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "servicesTest" - // Test client.Services + assert(client.Register(serviceName, ":1111"), t) + assert(client.Register(serviceName, ":2222"), t) services, err := client.Services(serviceName, 1) - if err != nil { - t.Fatal("Unable to get services:", err) - } - if len(services) != 4 { + assert(err, t) + if len(services) != 2 { t.Fatal("Not all registered services were returned:", services) } +} + +func TestWatch(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "watchTest" - // Test Watch with bringCurrent + assert(client.Register(serviceName, ":1111"), t) + assert(client.Register(serviceName, ":2222"), t) + + set, err := client.ServiceSet(serviceName) + assert(err, t) - set, _ = client.ServiceSet(serviceName) updates := make(chan *agent.ServiceUpdate) set.Watch(updates, true, false) - err = client.Register(serviceName, ":5555") - if err != nil { - t.Fatal("Registering service failed", err) - } - for i := 0; i < 5; i++ { + assert(client.Register(serviceName, ":3333"), t) + for i := 0; i < 3; i++ { var update *agent.ServiceUpdate select { case update = <-updates: @@ -199,42 +251,32 @@ func TestClient(t *testing.T) { } } + assert(set.Close(), t) } func TestNoServices(t *testing.T) { - killEtcd := runEtcdServer() - defer killEtcd() - killDiscoverd := runDiscoverdServer() - defer killDiscoverd() + client, cleanup := setup(t) + defer cleanup() - client, err := NewClient() - if err != nil { - t.Fatal(err) - } + set, err := client.ServiceSet("nonexistent") + assert(err, t) - set, _ := client.ServiceSet("nonexistent") if len(set.Services()) != 0 { t.Fatal("There should be no services") } + + assert(set.Close(), t) } func TestServiceAgeAndLeader(t *testing.T) { - killEtcd := runEtcdServer() - defer killEtcd() - killDiscoverd := runDiscoverdServer() - defer killDiscoverd() + client, cleanup := setup(t) + defer cleanup() - client, err := NewClient() - if err != nil { - t.Fatal(err) - } - serviceName := "ageService" + serviceName := "leaderTest" - err = client.Register(serviceName, ":1111") - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } - services, _ := client.Services(serviceName, 1) + assert(client.Register(serviceName, ":1111"), t) + services, err := client.Services(serviceName, 1) + assert(err, t) if len(services) < 1 { t.Fatal("Registered service not online") } @@ -242,11 +284,9 @@ func TestServiceAgeAndLeader(t *testing.T) { t.Fatal("Service has no age") } - err = client.Register(serviceName, ":2222") - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } - services, _ = client.Services(serviceName, 1) + assert(client.Register(serviceName, ":2222"), t) + services, err = client.Services(serviceName, 1) + assert(err, t) if len(services) < 2 { t.Fatal("Registered services not online") } @@ -260,11 +300,9 @@ func TestServiceAgeAndLeader(t *testing.T) { } } - err = client.Register(serviceName, ":3333") - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } - set, _ := client.ServiceSet(serviceName) + assert(client.Register(serviceName, ":3333"), t) + set, err := client.ServiceSet(serviceName) + assert(err, t) if len(set.Services()) < 3 { t.Fatal("Registered services not online") } @@ -272,4 +310,5 @@ func TestServiceAgeAndLeader(t *testing.T) { t.Fatal("Incorrect leader") } + assert(set.Close(), t) } From ab6261be09ad80ce8361c600a20a9220c5cb5745 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sat, 4 Jan 2014 18:21:10 -0600 Subject: [PATCH 03/14] discoverd/client: test for RegisterWithSet --- client_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/client_test.go b/client_test.go index 79bead1b25..a181912042 100644 --- a/client_test.go +++ b/client_test.go @@ -268,6 +268,33 @@ func TestNoServices(t *testing.T) { assert(set.Close(), t) } +func TestRegisterWithSet(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "registerWithSetTest" + + assert(client.Register(serviceName, ":1111"), t) + + set, err := client.RegisterWithSet(serviceName, ":2222", nil) + assert(err, t) + + if len(set.Services()) != 1 { + t.Fatal("There should only be one other service") + } + if set.Services()[0].Addr != ":1111" { + t.Fatal("Set contains the wrong service") + } + + assert(set.Close(), t) + + services, err := client.Services(serviceName, 1) + assert(err, t) + if len(services) != 2 { + t.Fatal("Not all registered services were returned:", services) + } +} + func TestServiceAgeAndLeader(t *testing.T) { client, cleanup := setup(t) defer cleanup() From a1ed9e371d74a117d5f6eddbe8cc67d27eb8cd88 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sat, 4 Jan 2014 19:07:53 -0600 Subject: [PATCH 04/14] discoverd/client: set.Services() always returns services sorted by age, so set.Services()[0] is always leader. implemented new behavior for set.Leader() so it returns the leader service as it changes --- client.go | 37 +++++++++++++++++++------ client_test.go | 75 +++++++++++++++++++++++++++++++++----------------- 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/client.go b/client.go index de9168d2c0..df634226ee 100644 --- a/client.go +++ b/client.go @@ -27,6 +27,7 @@ type ServiceSet struct { filters map[string]string watches map[chan *agent.ServiceUpdate]bool ignores map[string]struct{} + leaders chan *Service call *rpcplus.Call } @@ -122,21 +123,36 @@ func (s *ServiceSet) matchFilters(attrs map[string]string) bool { return true } +func (s *ServiceSet) Leader() chan *Service { + if s.leaders != nil { + return s.leaders + } + s.leaders = make(chan *Service) + updates := make(chan *agent.ServiceUpdate) + s.Watch(updates, false, false) + go func() { + leader := s.Services()[0] + s.leaders <- leader + for { + update := <-updates + if update == nil { + return + } + if !update.Online && update.Addr == leader.Addr { + leader = s.Services()[0] + s.leaders <- leader + } + } + }() + return s.leaders +} + type serviceByAge []*Service func (a serviceByAge) Len() int { return len(a) } func (a serviceByAge) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a serviceByAge) Less(i, j int) bool { return a[i].Created < a[j].Created } -func (s *ServiceSet) Leader() *Service { - services := s.Services() - if len(services) > 0 { - sort.Sort(serviceByAge(services)) - return services[0] - } - return nil -} - func (s *ServiceSet) Services() []*Service { s.Lock() defer s.Unlock() @@ -144,6 +160,9 @@ func (s *ServiceSet) Services() []*Service { for _, service := range s.services { list = append(list, copyService(service)) } + if len(list) > 0 { + sort.Sort(serviceByAge(list)) + } return list } diff --git a/client_test.go b/client_test.go index a181912042..9b4ceb3d32 100644 --- a/client_test.go +++ b/client_test.go @@ -295,45 +295,68 @@ func TestRegisterWithSet(t *testing.T) { } } -func TestServiceAgeAndLeader(t *testing.T) { +func TestServiceAge(t *testing.T) { client, cleanup := setup(t) defer cleanup() - serviceName := "leaderTest" + serviceName := "ageTest" - assert(client.Register(serviceName, ":1111"), t) - services, err := client.Services(serviceName, 1) - assert(err, t) - if len(services) < 1 { - t.Fatal("Registered service not online") - } - if services[0].Created < 1 { - t.Fatal("Service has no age") + checkOldest := func(addr string) { + services, err := client.Services(serviceName, 1) + assert(err, t) + if services[0].Addr != addr { + t.Fatal("Oldest service is not first in Services() slice") + } } + assert(client.Register(serviceName, ":1111"), t) + checkOldest(":1111") assert(client.Register(serviceName, ":2222"), t) - services, err = client.Services(serviceName, 1) + checkOldest(":1111") + assert(client.Register(serviceName, ":3333"), t) + checkOldest(":1111") + assert(client.Register(serviceName, ":4444"), t) + checkOldest(":1111") + assert(client.Unregister(serviceName, ":1111"), t) + checkOldest(":2222") + +} + +func TestLeaderChannel(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "leadersTest" + + assert(client.Register(serviceName, ":1111"), t) + + set, err := client.ServiceSet(serviceName) assert(err, t) - if len(services) < 2 { - t.Fatal("Registered services not online") - } - if services[0].Port == "1111" { - if services[0].Created >= services[1].Created { - t.Fatal("Older service does not have smaller Created value") - } - } else { - if services[1].Created >= services[0].Created { - t.Fatal("Older service does not have smaller Created value") + + var leader *Service + + go func() { + for { + leader = <-set.Leader() } + }() + + assert(client.Register(serviceName, ":2222"), t) + + if leader.Addr != ":1111" { + t.Fatal("Incorrect leader") } assert(client.Register(serviceName, ":3333"), t) - set, err := client.ServiceSet(serviceName) - assert(err, t) - if len(set.Services()) < 3 { - t.Fatal("Registered services not online") + assert(client.Unregister(serviceName, ":1111"), t) + + if leader.Addr != ":2222" { + t.Fatal("Incorrect leader") } - if set.Leader().Port != "1111" { + + assert(client.Unregister(serviceName, ":2222"), t) + + if leader.Addr != ":3333" { t.Fatal("Incorrect leader") } From 3ed59746666adc62f8607b44d3f08c6ca5f3ce7c Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sat, 4 Jan 2014 20:25:21 -0600 Subject: [PATCH 05/14] discoverd/client: added RegisterAndStandby and made sure RegisterWithSet continues to give the right leader even when its the service registered. changed ignores on set to concept of self, which is only used for RegisterWithSet (and therefore RegisterAndStandby) --- client.go | 65 +++++++++++++++++++++++++++++++++++++------------- client_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 106 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index df634226ee..8535ff3798 100644 --- a/client.go +++ b/client.go @@ -26,9 +26,10 @@ type ServiceSet struct { services map[string]*Service filters map[string]string watches map[chan *agent.ServiceUpdate]bool - ignores map[string]struct{} leaders chan *Service call *rpcplus.Call + self *Service + SelfAddr string } func copyService(service *Service) *Service { @@ -45,7 +46,6 @@ func makeServiceSet(call *rpcplus.Call) *ServiceSet { services: make(map[string]*Service), filters: make(map[string]string), watches: make(map[chan *agent.ServiceUpdate]bool), - ignores: make(map[string]struct{}), call: call, } } @@ -67,7 +67,7 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { s.Unlock() continue } - if _, ignore := s.ignores[update.Addr]; !ignore && update.Online { + if s.SelfAddr != update.Addr && update.Online { if _, exists := s.services[update.Addr]; !exists { host, port, _ := net.SplitHostPort(update.Addr) s.services[update.Addr] = &Service{ @@ -84,6 +84,9 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { delete(s.services, update.Addr) } else { s.Unlock() + if s.SelfAddr == update.Addr { + s.updateWatches(update) + } continue } } @@ -139,8 +142,17 @@ func (s *ServiceSet) Leader() chan *Service { return } if !update.Online && update.Addr == leader.Addr { - leader = s.Services()[0] - s.leaders <- leader + if len(s.Services()) == 0 && s.self != nil { + s.leaders <- s.self + } else { + leader = s.Services()[0] + if s.self != nil && leader.Created > s.self.Created { + // self is real leader + s.leaders <- s.self + } else { + s.leaders <- leader + } + } } } }() @@ -201,17 +213,6 @@ func (s *ServiceSet) Filter(attrs map[string]string) { } } -func (s *ServiceSet) Ignore(addr string) { - s.Lock() - defer s.Unlock() - s.ignores[addr] = struct{}{} - for key, service := range s.services { - if service.Addr == addr { - delete(s.services, key) - } - } -} - func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fireOnce bool) { s.Lock() s.watches[ch] = fireOnce @@ -311,10 +312,40 @@ func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string c.Unregister(name, addr) return nil, err } - set.Ignore(c.expandedAddrs[addr]) + set.SelfAddr = c.expandedAddrs[addr] + _, exists := set.services[set.SelfAddr] + if !exists { + update := <-set.Wait() + for update.Addr != set.SelfAddr { + update = <-set.Wait() + } + } + set.Lock() + set.self = set.services[set.SelfAddr] + delete(set.services, set.SelfAddr) + set.Unlock() return set, nil } +func (c *Client) RegisterAndStandby(name, addr string, attributes map[string]string) (chan *Service, error) { + set, err := c.RegisterWithSet(name, addr, attributes) + if err != nil { + return nil, err + } + standbyCh := make(chan *Service) + go func() { + for { + leader := <-set.Leader() + if leader.Addr == set.SelfAddr { + set.Close() + standbyCh <- leader + return + } + } + }() + return standbyCh, err +} + func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string]string) error { args := &agent.Args{ Name: name, diff --git a/client_test.go b/client_test.go index 9b4ceb3d32..028513775e 100644 --- a/client_test.go +++ b/client_test.go @@ -351,7 +351,7 @@ func TestLeaderChannel(t *testing.T) { assert(client.Unregister(serviceName, ":1111"), t) if leader.Addr != ":2222" { - t.Fatal("Incorrect leader") + t.Fatal("Incorrect leader", leader) } assert(client.Unregister(serviceName, ":2222"), t) @@ -362,3 +362,60 @@ func TestLeaderChannel(t *testing.T) { assert(set.Close(), t) } + +func TestRegisterWithSetLeaderSelf(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "registerWithSetLeaderSelfTest" + + assert(client.Register(serviceName, ":1111"), t) + + set, err := client.RegisterWithSet(serviceName, ":2222", nil) + assert(err, t) + + var leader *Service + + go func() { + for { + leader = <-set.Leader() + } + }() + + assert(client.Register(serviceName, ":3333"), t) + + if leader.Addr != ":1111" { + t.Fatal("Incorrect leader") + } + + assert(client.Unregister(serviceName, ":1111"), t) + + if leader.Addr != set.SelfAddr { + t.Fatal("Incorrect leader", leader) + } + + assert(set.Close(), t) + +} + +func TestRegisterAndStandby(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "registerAndStandbyTest" + + assert(client.Register(serviceName, ":1111"), t) + + standbyCh, err := client.RegisterAndStandby(serviceName, ":2222", nil) + assert(err, t) + + assert(client.Register(serviceName, ":3333"), t) + assert(client.Unregister(serviceName, ":3333"), t) + assert(client.Unregister(serviceName, ":1111"), t) + + leader := <-standbyCh + if leader.Addr != ":2222" { + t.Fatal("Incorrect leader", leader) + } + +} From 06d7dcad821b4c4403b44d2904ac0ff0621aec98 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sun, 5 Jan 2014 21:38:38 -0600 Subject: [PATCH 06/14] discoverd/client: fixing some of the easier issues from the review --- client.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index 8535ff3798..ac45984b1c 100644 --- a/client.go +++ b/client.go @@ -136,8 +136,7 @@ func (s *ServiceSet) Leader() chan *Service { go func() { leader := s.Services()[0] s.leaders <- leader - for { - update := <-updates + for update := range updates { if update == nil { return } @@ -284,7 +283,7 @@ func (c *Client) ServiceSet(name string) (*ServiceSet, error) { return set, nil } -func (c *Client) Services(name string, timeout int) ([]*Service, error) { +func (c *Client) Services(name string, timeout time.Duration) ([]*Service, error) { set, err := c.ServiceSet(name) if err != nil { return nil, err @@ -334,8 +333,7 @@ func (c *Client) RegisterAndStandby(name, addr string, attributes map[string]str } standbyCh := make(chan *Service) go func() { - for { - leader := <-set.Leader() + for leader := range set.Leader() { if leader.Addr == set.SelfAddr { set.Close() standbyCh <- leader @@ -343,7 +341,7 @@ func (c *Client) RegisterAndStandby(name, addr string, attributes map[string]str } } }() - return standbyCh, err + return standbyCh, nil } func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string]string) error { From 7df6ec4192387d91f1ba9004fa7ab87041b665e2 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sun, 5 Jan 2014 21:57:21 -0600 Subject: [PATCH 07/14] discoverd/client: made explicit function to determine leader and renamed Leader to Leaders --- client.go | 36 +++++++++++++++++++----------------- client_test.go | 4 ++-- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/client.go b/client.go index ac45984b1c..25136a4962 100644 --- a/client.go +++ b/client.go @@ -126,7 +126,21 @@ func (s *ServiceSet) matchFilters(attrs map[string]string) bool { return true } -func (s *ServiceSet) Leader() chan *Service { +func (s *ServiceSet) Leader() *Service { + services := s.Services() + if len(services) > 0 { + if s.self != nil && services[0].Created > s.self.Created { + return s.self + } + return services[0] + } + if s.self != nil { + return s.self + } + return nil +} + +func (s *ServiceSet) Leaders() chan *Service { if s.leaders != nil { return s.leaders } @@ -134,24 +148,12 @@ func (s *ServiceSet) Leader() chan *Service { updates := make(chan *agent.ServiceUpdate) s.Watch(updates, false, false) go func() { - leader := s.Services()[0] + leader := s.Leader() s.leaders <- leader for update := range updates { - if update == nil { - return - } if !update.Online && update.Addr == leader.Addr { - if len(s.Services()) == 0 && s.self != nil { - s.leaders <- s.self - } else { - leader = s.Services()[0] - if s.self != nil && leader.Created > s.self.Created { - // self is real leader - s.leaders <- s.self - } else { - s.leaders <- leader - } - } + leader = s.Leader() + s.leaders <- leader } } }() @@ -333,7 +335,7 @@ func (c *Client) RegisterAndStandby(name, addr string, attributes map[string]str } standbyCh := make(chan *Service) go func() { - for leader := range set.Leader() { + for leader := range set.Leaders() { if leader.Addr == set.SelfAddr { set.Close() standbyCh <- leader diff --git a/client_test.go b/client_test.go index 028513775e..fa1e49d44f 100644 --- a/client_test.go +++ b/client_test.go @@ -337,7 +337,7 @@ func TestLeaderChannel(t *testing.T) { go func() { for { - leader = <-set.Leader() + leader = <-set.Leaders() } }() @@ -378,7 +378,7 @@ func TestRegisterWithSetLeaderSelf(t *testing.T) { go func() { for { - leader = <-set.Leader() + leader = <-set.Leaders() } }() From 3d77e4d13e639fdadb41aded05ec01cb4e856ca4 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sun, 5 Jan 2014 22:04:05 -0600 Subject: [PATCH 08/14] discoverd/client: put mutexes behind unexported member --- client.go | 56 +++++++++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/client.go b/client.go index 25136a4962..a74386ef0a 100644 --- a/client.go +++ b/client.go @@ -22,7 +22,7 @@ type Service struct { } type ServiceSet struct { - sync.Mutex + l sync.Mutex services map[string]*Service filters map[string]string watches map[chan *agent.ServiceUpdate]bool @@ -62,9 +62,9 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { isCurrent = true continue } - s.Lock() + s.l.Lock() if s.filters != nil && !s.matchFilters(update.Attrs) { - s.Unlock() + s.l.Unlock() continue } if s.SelfAddr != update.Addr && update.Online { @@ -83,14 +83,14 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { if _, exists := s.services[update.Addr]; exists { delete(s.services, update.Addr) } else { - s.Unlock() + s.l.Unlock() if s.SelfAddr == update.Addr { s.updateWatches(update) } continue } } - s.Unlock() + s.l.Unlock() s.updateWatches(update) } s.closeWatches() @@ -99,8 +99,8 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { } func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() for ch, once := range s.watches { ch <- update if once { @@ -110,8 +110,8 @@ func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { } func (s *ServiceSet) closeWatches() { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() for ch := range s.watches { close(ch) } @@ -167,8 +167,8 @@ func (a serviceByAge) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a serviceByAge) Less(i, j int) bool { return a[i].Created < a[j].Created } func (s *ServiceSet) Services() []*Service { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() list := make([]*Service, 0, len(s.services)) for _, service := range s.services { list = append(list, copyService(service)) @@ -188,8 +188,8 @@ func (s *ServiceSet) Addrs() []string { } func (s *ServiceSet) Select(attrs map[string]string) []*Service { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() list := make([]*Service, 0, len(s.services)) outer: for _, service := range s.services { @@ -204,8 +204,8 @@ outer: } func (s *ServiceSet) Filter(attrs map[string]string) { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() s.filters = attrs for key, service := range s.services { if !s.matchFilters(service.Attrs) { @@ -215,13 +215,13 @@ func (s *ServiceSet) Filter(attrs map[string]string) { } func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fireOnce bool) { - s.Lock() + s.l.Lock() s.watches[ch] = fireOnce - s.Unlock() + s.l.Unlock() if bringCurrent { go func() { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() for _, service := range s.services { ch <- &agent.ServiceUpdate{ Name: service.Name, @@ -236,8 +236,8 @@ func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fire } func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { - s.Lock() - defer s.Unlock() + s.l.Lock() + defer s.l.Unlock() delete(s.watches, ch) } @@ -252,7 +252,7 @@ func (s *ServiceSet) Close() error { } type Client struct { - sync.Mutex + l sync.Mutex client *rpcplus.Client heartbeats map[string]chan struct{} expandedAddrs map[string]string @@ -321,10 +321,10 @@ func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string update = <-set.Wait() } } - set.Lock() + set.l.Lock() set.self = set.services[set.SelfAddr] delete(set.services, set.SelfAddr) - set.Unlock() + set.l.Unlock() return set, nil } @@ -358,10 +358,10 @@ func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string return errors.New("discover: register failed: " + err.Error()) } done := make(chan struct{}) - c.Lock() + c.l.Lock() c.heartbeats[args.Addr] = done c.expandedAddrs[args.Addr] = ret - c.Unlock() + c.l.Unlock() go func() { ticker := time.NewTicker(agent.HeartbeatIntervalSecs * time.Second) // TODO: add jitter defer ticker.Stop() @@ -386,10 +386,10 @@ func (c *Client) Unregister(name, addr string) error { Name: name, Addr: addr, } - c.Lock() + c.l.Lock() close(c.heartbeats[args.Addr]) delete(c.heartbeats, args.Addr) - c.Unlock() + c.l.Unlock() err := c.client.Call("Agent.Unregister", args, &struct{}{}) if err != nil { return errors.New("discover: unregister failed: " + err.Error()) From 14b50fad53b9c3dfd3c0c334c5a5c01636a7ed1a Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Sun, 5 Jan 2014 22:46:15 -0600 Subject: [PATCH 09/14] discoverd/client: fixing races. jonathan is my race detector. mine is broken. --- client.go | 29 +++++++++++++++-------------- client_test.go | 6 ++++-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index a74386ef0a..aef4a2148a 100644 --- a/client.go +++ b/client.go @@ -26,7 +26,6 @@ type ServiceSet struct { services map[string]*Service filters map[string]string watches map[chan *agent.ServiceUpdate]bool - leaders chan *Service call *rpcplus.Call self *Service SelfAddr string @@ -83,9 +82,11 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { if _, exists := s.services[update.Addr]; exists { delete(s.services, update.Addr) } else { - s.l.Unlock() if s.SelfAddr == update.Addr { + s.l.Unlock() s.updateWatches(update) + } else { + s.l.Unlock() } continue } @@ -141,23 +142,20 @@ func (s *ServiceSet) Leader() *Service { } func (s *ServiceSet) Leaders() chan *Service { - if s.leaders != nil { - return s.leaders - } - s.leaders = make(chan *Service) + leaders := make(chan *Service) updates := make(chan *agent.ServiceUpdate) s.Watch(updates, false, false) go func() { leader := s.Leader() - s.leaders <- leader + leaders <- leader for update := range updates { if !update.Online && update.Addr == leader.Addr { leader = s.Leader() - s.leaders <- leader + leaders <- leader } } }() - return s.leaders + return leaders } type serviceByAge []*Service @@ -313,12 +311,15 @@ func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string c.Unregister(name, addr) return nil, err } + set.l.Lock() set.SelfAddr = c.expandedAddrs[addr] - _, exists := set.services[set.SelfAddr] - if !exists { - update := <-set.Wait() - for update.Addr != set.SelfAddr { - update = <-set.Wait() + set.l.Unlock() + updates := make(chan *agent.ServiceUpdate) + set.Watch(updates, true, false) + for update := range updates { + if update.Addr == set.SelfAddr { + set.Unwatch(updates) + break } } set.l.Lock() diff --git a/client_test.go b/client_test.go index fa1e49d44f..a4f43806c2 100644 --- a/client_test.go +++ b/client_test.go @@ -336,8 +336,9 @@ func TestLeaderChannel(t *testing.T) { var leader *Service go func() { + leaders := set.Leaders() for { - leader = <-set.Leaders() + leader = <-leaders } }() @@ -377,8 +378,9 @@ func TestRegisterWithSetLeaderSelf(t *testing.T) { var leader *Service go func() { + leaders := set.Leaders() for { - leader = <-set.Leaders() + leader = <-leaders } }() From 83dce0cea7a2731f5c35f98819ae7bfaea31eaf9 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Mon, 6 Jan 2014 00:49:04 -0600 Subject: [PATCH 10/14] discoverd/client: cleaned up Wait/Watch api instead of adding another Updates method. also fixes the race with bringCurrent --- client.go | 51 ++++++++++++++++++++++---------------------------- client_test.go | 5 ++--- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/client.go b/client.go index aef4a2148a..d8078781db 100644 --- a/client.go +++ b/client.go @@ -143,8 +143,7 @@ func (s *ServiceSet) Leader() *Service { func (s *ServiceSet) Leaders() chan *Service { leaders := make(chan *Service) - updates := make(chan *agent.ServiceUpdate) - s.Watch(updates, false, false) + updates := s.Watch(false, false) go func() { leader := s.Leader() leaders <- leader @@ -212,25 +211,26 @@ func (s *ServiceSet) Filter(attrs map[string]string) { } } -func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool, fireOnce bool) { +func (s *ServiceSet) Watch(bringCurrent bool, fireOnce bool) chan *agent.ServiceUpdate { s.l.Lock() - s.watches[ch] = fireOnce - s.l.Unlock() + defer s.l.Unlock() + var updates chan *agent.ServiceUpdate if bringCurrent { - go func() { - s.l.Lock() - defer s.l.Unlock() - for _, service := range s.services { - ch <- &agent.ServiceUpdate{ - Name: service.Name, - Addr: service.Addr, - Online: true, - Attrs: service.Attrs, - Created: service.Created, - } + updates = make(chan *agent.ServiceUpdate, len(s.services)) + for _, service := range s.services { + updates <- &agent.ServiceUpdate{ + Name: service.Name, + Addr: service.Addr, + Online: true, + Attrs: service.Attrs, + Created: service.Created, } - }() + } + } else { + updates = make(chan *agent.ServiceUpdate) } + s.watches[updates] = fireOnce + return updates } func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { @@ -239,12 +239,6 @@ func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { delete(s.watches, ch) } -func (s *ServiceSet) Wait() chan *agent.ServiceUpdate { - updateCh := make(chan *agent.ServiceUpdate, 1024) // buffer because of Watch bringCurrent race bug - s.Watch(updateCh, true, true) - return updateCh -} - func (s *ServiceSet) Close() error { return s.call.CloseStream() } @@ -283,17 +277,17 @@ func (c *Client) ServiceSet(name string) (*ServiceSet, error) { return set, nil } -func (c *Client) Services(name string, timeout time.Duration) ([]*Service, error) { +func (c *Client) Services(name string, timeoutSec time.Duration) ([]*Service, error) { set, err := c.ServiceSet(name) if err != nil { return nil, err } defer set.Close() select { - case <-set.Wait(): + case <-set.Watch(true, true): return set.Services(), nil - case <-time.After(time.Duration(timeout) * time.Second): - return nil, errors.New("discover: wait timeout exceeded") + case <-time.After(timeoutSec * time.Second): + return nil, errors.New("discover: timeout exceeded") } } @@ -314,8 +308,7 @@ func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string set.l.Lock() set.SelfAddr = c.expandedAddrs[addr] set.l.Unlock() - updates := make(chan *agent.ServiceUpdate) - set.Watch(updates, true, false) + updates := set.Watch(true, false) for update := range updates { if update.Addr == set.SelfAddr { set.Unwatch(updates) diff --git a/client_test.go b/client_test.go index a4f43806c2..4e03a764a0 100644 --- a/client_test.go +++ b/client_test.go @@ -147,7 +147,7 @@ func TestNewAttributes(t *testing.T) { assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "baz"}), t) - <-set.Wait() + <-set.Watch(true, true) if set.Services()[0].Attrs["foo"] != "baz" { t.Fatal("Attribute not set on re-registered service as 'baz'") } @@ -233,8 +233,7 @@ func TestWatch(t *testing.T) { set, err := client.ServiceSet(serviceName) assert(err, t) - updates := make(chan *agent.ServiceUpdate) - set.Watch(updates, true, false) + updates := set.Watch(true, false) assert(client.Register(serviceName, ":3333"), t) for i := 0; i < 3; i++ { var update *agent.ServiceUpdate From aa5f401d143a876a250a0bcb47342dba2e5e9d56 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Mon, 6 Jan 2014 10:57:59 -0600 Subject: [PATCH 11/14] discoverd/client: UnregisterAll convenience --- client.go | 21 +++++++++++++++++++++ client_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/client.go b/client.go index d8078781db..8d978d86f9 100644 --- a/client.go +++ b/client.go @@ -248,6 +248,7 @@ type Client struct { client *rpcplus.Client heartbeats map[string]chan struct{} expandedAddrs map[string]string + names map[string]string } func NewClient() (*Client, error) { @@ -264,6 +265,7 @@ func NewClientUsingAddress(addr string) (*Client, error) { client: client, heartbeats: make(map[string]chan struct{}), expandedAddrs: make(map[string]string), + names: make(map[string]string), }, err } @@ -355,6 +357,7 @@ func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string c.l.Lock() c.heartbeats[args.Addr] = done c.expandedAddrs[args.Addr] = ret + c.names[args.Addr] = name c.l.Unlock() go func() { ticker := time.NewTicker(agent.HeartbeatIntervalSecs * time.Second) // TODO: add jitter @@ -390,3 +393,21 @@ func (c *Client) Unregister(name, addr string) error { } return nil } + +func (c *Client) UnregisterAll() error { + c.l.Lock() + addrs := make([]string, 0, len(c.heartbeats)) + names := make([]string, 0, len(c.heartbeats)) + for addr, _ := range c.heartbeats { + addrs = append(addrs, addr) + names = append(names, c.names[addr]) + } + c.l.Unlock() + for i := range addrs { + err := c.Unregister(names[i], addrs[i]) + if err != nil { + return err + } + } + return nil +} diff --git a/client_test.go b/client_test.go index 4e03a764a0..03e965a7e7 100644 --- a/client_test.go +++ b/client_test.go @@ -420,3 +420,32 @@ func TestRegisterAndStandby(t *testing.T) { } } + +func TestUnregisterAll(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "unregisterAllTest" + + assert(client.Register(serviceName, ":1111"), t) + assert(client.Register(serviceName, ":2222"), t) + assert(client.Register(serviceName, ":3333"), t) + + services, err := client.Services(serviceName, 1) + assert(err, t) + if len(services) != 3 { + t.Fatal("Wrong number of services") + } + + assert(client.UnregisterAll(), t) + + set, err := client.ServiceSet("nonexistent") + assert(err, t) + + if len(set.Services()) != 0 { + t.Fatal("There should be no services") + } + + assert(set.Close(), t) + +} From 4a596373eba60821f1b1aa341ac684e0476911da Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Mon, 6 Jan 2014 12:55:48 -0600 Subject: [PATCH 12/14] discoverd/client: adding default client and top-level calls. renamed ServiceSet function to NewServiceSet to avoid collision with type. other fixes from review --- client.go | 109 ++++++++++++++++++++++++++++++++++++++++++++----- client_test.go | 45 ++++++++++++++++---- 2 files changed, 136 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index 8d978d86f9..0b4f36f2a5 100644 --- a/client.go +++ b/client.go @@ -101,10 +101,18 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { s.l.Lock() - defer s.l.Unlock() - for ch, once := range s.watches { - ch <- update + watches := make(map[chan *agent.ServiceUpdate]bool) + for k, v := range s.watches { + watches[k] = v + } + s.l.Unlock() + for ch, once := range watches { + select { + case ch <- update: + case <-time.After(time.Millisecond): + } if once { + close(ch) delete(s.watches, ch) } } @@ -112,8 +120,12 @@ func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { func (s *ServiceSet) closeWatches() { s.l.Lock() - defer s.l.Unlock() - for ch := range s.watches { + watches := make(map[chan *agent.ServiceUpdate]bool) + for k, v := range s.watches { + watches[k] = v + } + s.l.Unlock() + for ch := range watches { close(ch) } } @@ -236,6 +248,7 @@ func (s *ServiceSet) Watch(bringCurrent bool, fireOnce bool) chan *agent.Service func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { s.l.Lock() defer s.l.Unlock() + close(ch) delete(s.watches, ch) } @@ -269,7 +282,7 @@ func NewClientUsingAddress(addr string) (*Client, error) { }, err } -func (c *Client) ServiceSet(name string) (*ServiceSet, error) { +func (c *Client) NewServiceSet(name string) (*ServiceSet, error) { updates := make(chan *agent.ServiceUpdate) call := c.client.StreamGo("Agent.Subscribe", &agent.Args{ Name: name, @@ -279,8 +292,8 @@ func (c *Client) ServiceSet(name string) (*ServiceSet, error) { return set, nil } -func (c *Client) Services(name string, timeoutSec time.Duration) ([]*Service, error) { - set, err := c.ServiceSet(name) +func (c *Client) Services(name string, timeout time.Duration) ([]*Service, error) { + set, err := c.NewServiceSet(name) if err != nil { return nil, err } @@ -288,7 +301,7 @@ func (c *Client) Services(name string, timeoutSec time.Duration) ([]*Service, er select { case <-set.Watch(true, true): return set.Services(), nil - case <-time.After(timeoutSec * time.Second): + case <-time.After(timeout): return nil, errors.New("discover: timeout exceeded") } } @@ -302,13 +315,15 @@ func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string if err != nil { return nil, err } - set, err := c.ServiceSet(name) + set, err := c.NewServiceSet(name) if err != nil { c.Unregister(name, addr) return nil, err } set.l.Lock() + c.l.Lock() set.SelfAddr = c.expandedAddrs[addr] + c.l.Unlock() set.l.Unlock() updates := set.Watch(true, false) for update := range updates { @@ -411,3 +426,77 @@ func (c *Client) UnregisterAll() error { } return nil } + +var defaultClient *Client + +func Connect(addr string) (err error) { + if addr == "" { + defaultClient, err = NewClient() + return + } + defaultClient, err = NewClientUsingAddress(addr) + return +} + +func ensureDefaultConnected() error { + if defaultClient == nil { + return Connect("") + } + return nil +} + +func NewServiceSet(name string) (*ServiceSet, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.NewServiceSet(name) +} + +func Services(name string, timeout time.Duration) ([]*Service, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.Services(name, timeout) +} + +func Register(name, addr string) error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.Register(name, addr) +} + +func RegisterWithSet(name, addr string, attributes map[string]string) (*ServiceSet, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.RegisterWithSet(name, addr, attributes) +} + +func RegisterAndStandby(name, addr string, attributes map[string]string) (chan *Service, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.RegisterAndStandby(name, addr, attributes) +} + +func RegisterWithAttributes(name, addr string, attributes map[string]string) error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.RegisterWithAttributes(name, addr, attributes) +} + +func Unregister(name, addr string) error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.Unregister(name, addr) +} + +func UnregisterAll() error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.UnregisterAll() +} diff --git a/client_test.go b/client_test.go index 03e965a7e7..ca83e0a14f 100644 --- a/client_test.go +++ b/client_test.go @@ -116,7 +116,7 @@ func TestBasicRegisterAndServiceSet(t *testing.T) { assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) assert(client.Register(serviceName, ":2222"), t) - set, err := client.ServiceSet(serviceName) + set, err := client.NewServiceSet(serviceName) assert(err, t) if len(set.Services()) < 2 { @@ -141,7 +141,7 @@ func TestNewAttributes(t *testing.T) { serviceName := "attributeTest" - set, err := client.ServiceSet(serviceName) + set, err := client.NewServiceSet(serviceName) assert(err, t) assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) @@ -161,7 +161,7 @@ func TestFiltering(t *testing.T) { serviceName := "filterTest" - set, err := client.ServiceSet(serviceName) + set, err := client.NewServiceSet(serviceName) assert(err, t) assert(client.Register(serviceName, ":1111"), t) @@ -191,7 +191,7 @@ func TestSelecting(t *testing.T) { serviceName := "selectTest" - set, err := client.ServiceSet(serviceName) + set, err := client.NewServiceSet(serviceName) assert(err, t) assert(client.Register(serviceName, ":1111"), t) @@ -230,7 +230,7 @@ func TestWatch(t *testing.T) { assert(client.Register(serviceName, ":1111"), t) assert(client.Register(serviceName, ":2222"), t) - set, err := client.ServiceSet(serviceName) + set, err := client.NewServiceSet(serviceName) assert(err, t) updates := set.Watch(true, false) @@ -257,7 +257,7 @@ func TestNoServices(t *testing.T) { client, cleanup := setup(t) defer cleanup() - set, err := client.ServiceSet("nonexistent") + set, err := client.NewServiceSet("nonexistent") assert(err, t) if len(set.Services()) != 0 { @@ -329,7 +329,7 @@ func TestLeaderChannel(t *testing.T) { assert(client.Register(serviceName, ":1111"), t) - set, err := client.ServiceSet(serviceName) + set, err := client.NewServiceSet(serviceName) assert(err, t) var leader *Service @@ -439,7 +439,36 @@ func TestUnregisterAll(t *testing.T) { assert(client.UnregisterAll(), t) - set, err := client.ServiceSet("nonexistent") + set, err := client.NewServiceSet(serviceName) + assert(err, t) + + if len(set.Services()) != 0 { + t.Fatal("There should be no services") + } + + assert(set.Close(), t) + +} + +func TestDefaulClient(t *testing.T) { + _, cleanup := setup(t) + defer cleanup() + + serviceName := "defaultClientTest" + + assert(Register(serviceName, ":1111"), t) + assert(Register(serviceName, ":2222"), t) + assert(Register(serviceName, ":3333"), t) + + services, err := Services(serviceName, 1) + assert(err, t) + if len(services) != 3 { + t.Fatal("Wrong number of services") + } + + assert(UnregisterAll(), t) + + set, err := NewServiceSet(serviceName) assert(err, t) if len(set.Services()) != 0 { From 91480cc097987b1800f1f59e92f991fbca77d4c3 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Mon, 6 Jan 2014 13:38:55 -0600 Subject: [PATCH 13/14] discoverd/client: race and review fixes from review and race detection --- client.go | 6 ++++-- client_test.go | 18 +++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 0b4f36f2a5..840c1683e9 100644 --- a/client.go +++ b/client.go @@ -101,7 +101,7 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { s.l.Lock() - watches := make(map[chan *agent.ServiceUpdate]bool) + watches := make(map[chan *agent.ServiceUpdate]bool, len(s.watches)) for k, v := range s.watches { watches[k] = v } @@ -113,14 +113,16 @@ func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { } if once { close(ch) + s.l.Lock() delete(s.watches, ch) + s.l.Unlock() } } } func (s *ServiceSet) closeWatches() { s.l.Lock() - watches := make(map[chan *agent.ServiceUpdate]bool) + watches := make(map[chan *agent.ServiceUpdate]bool, len(s.watches)) for k, v := range s.watches { watches[k] = v } diff --git a/client_test.go b/client_test.go index ca83e0a14f..fb152bbd76 100644 --- a/client_test.go +++ b/client_test.go @@ -332,31 +332,31 @@ func TestLeaderChannel(t *testing.T) { set, err := client.NewServiceSet(serviceName) assert(err, t) - var leader *Service + leader := make(chan *Service, 3) go func() { leaders := set.Leaders() for { - leader = <-leaders + leader <- <-leaders } }() assert(client.Register(serviceName, ":2222"), t) - if leader.Addr != ":1111" { + if (<-leader).Addr != ":1111" { t.Fatal("Incorrect leader") } assert(client.Register(serviceName, ":3333"), t) assert(client.Unregister(serviceName, ":1111"), t) - if leader.Addr != ":2222" { + if (<-leader).Addr != ":2222" { t.Fatal("Incorrect leader", leader) } assert(client.Unregister(serviceName, ":2222"), t) - if leader.Addr != ":3333" { + if (<-leader).Addr != ":3333" { t.Fatal("Incorrect leader") } @@ -374,24 +374,24 @@ func TestRegisterWithSetLeaderSelf(t *testing.T) { set, err := client.RegisterWithSet(serviceName, ":2222", nil) assert(err, t) - var leader *Service + leader := make(chan *Service, 2) go func() { leaders := set.Leaders() for { - leader = <-leaders + leader <- <-leaders } }() assert(client.Register(serviceName, ":3333"), t) - if leader.Addr != ":1111" { + if (<-leader).Addr != ":1111" { t.Fatal("Incorrect leader") } assert(client.Unregister(serviceName, ":1111"), t) - if leader.Addr != set.SelfAddr { + if (<-leader).Addr != set.SelfAddr { t.Fatal("Incorrect leader", leader) } From a090611c8b59f8ad3e7fdf89148d5754c31a127d Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Mon, 6 Jan 2014 13:45:55 -0600 Subject: [PATCH 14/14] discoverd/client: lock it up --- client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client.go b/client.go index 840c1683e9..3dd4e2ded0 100644 --- a/client.go +++ b/client.go @@ -430,6 +430,7 @@ func (c *Client) UnregisterAll() error { } var defaultClient *Client +var defaultEnsureLock = &sync.Mutex{} func Connect(addr string) (err error) { if addr == "" { @@ -441,6 +442,8 @@ func Connect(addr string) (err error) { } func ensureDefaultConnected() error { + defaultEnsureLock.Lock() + defer defaultEnsureLock.Unlock() if defaultClient == nil { return Connect("") }