diff --git a/client.go b/client.go index 3a172f421c..3dd4e2ded0 100644 --- a/client.go +++ b/client.go @@ -12,8 +12,6 @@ import ( "github.com/flynn/rpcplus" ) -var WaitTimeoutSecs = 10 - type Service struct { Created uint Name string @@ -24,13 +22,13 @@ type Service struct { } type ServiceSet struct { - services map[string]*Service - filters map[string]string - watches map[chan *agent.ServiceUpdate]struct{} - serMutex sync.Mutex - filMutex sync.Mutex - watchMutex sync.Mutex - call *rpcplus.Call + l sync.Mutex + services map[string]*Service + filters map[string]string + watches map[chan *agent.ServiceUpdate]bool + call *rpcplus.Call + self *Service + SelfAddr string } func copyService(service *Service) *Service { @@ -46,7 +44,7 @@ func makeServiceSet(call *rpcplus.Call) *ServiceSet { return &ServiceSet{ services: make(map[string]*Service), filters: make(map[string]string), - watches: make(map[chan *agent.ServiceUpdate]struct{}), + watches: make(map[chan *agent.ServiceUpdate]bool), call: call, } } @@ -63,12 +61,12 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { isCurrent = true continue } + s.l.Lock() if s.filters != nil && !s.matchFilters(update.Attrs) { + s.l.Unlock() continue } - - s.serMutex.Lock() - if update.Online { + if s.SelfAddr != update.Addr && update.Online { if _, exists := s.services[update.Addr]; !exists { host, port, _ := net.SplitHostPort(update.Addr) s.services[update.Addr] = &Service{ @@ -84,11 +82,16 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { if _, exists := s.services[update.Addr]; exists { delete(s.services, update.Addr) } else { - s.serMutex.Unlock() + if s.SelfAddr == update.Addr { + s.l.Unlock() + s.updateWatches(update) + } else { + s.l.Unlock() + } continue } } - s.serMutex.Unlock() + s.l.Unlock() s.updateWatches(update) } s.closeWatches() @@ -97,24 +100,39 @@ func (s *ServiceSet) bind(updates chan *agent.ServiceUpdate) chan struct{} { } func (s *ServiceSet) updateWatches(update *agent.ServiceUpdate) { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() - for ch := range s.watches { - ch <- update + s.l.Lock() + watches := make(map[chan *agent.ServiceUpdate]bool, len(s.watches)) + for k, v := range s.watches { + watches[k] = v + } + s.l.Unlock() + for ch, once := range watches { + select { + case ch <- update: + case <-time.After(time.Millisecond): + } + if once { + close(ch) + s.l.Lock() + delete(s.watches, ch) + s.l.Unlock() + } } } func (s *ServiceSet) closeWatches() { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() - for ch := range s.watches { + s.l.Lock() + watches := make(map[chan *agent.ServiceUpdate]bool, len(s.watches)) + for k, v := range s.watches { + watches[k] = v + } + s.l.Unlock() + for ch := range watches { close(ch) } } func (s *ServiceSet) matchFilters(attrs map[string]string) bool { - s.filMutex.Lock() - defer s.filMutex.Unlock() for key, value := range s.filters { if attrs[key] != value { return false @@ -123,28 +141,52 @@ func (s *ServiceSet) matchFilters(attrs map[string]string) bool { return true } -type serviceByAge []*Service - -func (a serviceByAge) Len() int { return len(a) } -func (a serviceByAge) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a serviceByAge) Less(i, j int) bool { return a[i].Created < a[j].Created } - func (s *ServiceSet) Leader() *Service { services := s.Services() if len(services) > 0 { - sort.Sort(serviceByAge(services)) + if s.self != nil && services[0].Created > s.self.Created { + return s.self + } return services[0] } + if s.self != nil { + return s.self + } return nil } +func (s *ServiceSet) Leaders() chan *Service { + leaders := make(chan *Service) + updates := s.Watch(false, false) + go func() { + leader := s.Leader() + leaders <- leader + for update := range updates { + if !update.Online && update.Addr == leader.Addr { + leader = s.Leader() + leaders <- leader + } + } + }() + return leaders +} + +type serviceByAge []*Service + +func (a serviceByAge) Len() int { return len(a) } +func (a serviceByAge) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a serviceByAge) Less(i, j int) bool { return a[i].Created < a[j].Created } + func (s *ServiceSet) Services() []*Service { - s.serMutex.Lock() - defer s.serMutex.Unlock() + s.l.Lock() + defer s.l.Unlock() list := make([]*Service, 0, len(s.services)) for _, service := range s.services { list = append(list, copyService(service)) } + if len(list) > 0 { + sort.Sort(serviceByAge(list)) + } return list } @@ -157,8 +199,8 @@ func (s *ServiceSet) Addrs() []string { } func (s *ServiceSet) Select(attrs map[string]string) []*Service { - s.serMutex.Lock() - defer s.serMutex.Unlock() + s.l.Lock() + defer s.l.Unlock() list := make([]*Service, 0, len(s.services)) outer: for _, service := range s.services { @@ -173,11 +215,9 @@ outer: } func (s *ServiceSet) Filter(attrs map[string]string) { - s.filMutex.Lock() + s.l.Lock() + defer s.l.Unlock() s.filters = attrs - s.filMutex.Unlock() - s.serMutex.Lock() - defer s.serMutex.Unlock() for key, service := range s.services { if !s.matchFilters(service.Attrs) { delete(s.services, key) @@ -185,53 +225,45 @@ func (s *ServiceSet) Filter(attrs map[string]string) { } } -func (s *ServiceSet) Watch(ch chan *agent.ServiceUpdate, bringCurrent bool) { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() - s.watches[ch] = struct{}{} +func (s *ServiceSet) Watch(bringCurrent bool, fireOnce bool) chan *agent.ServiceUpdate { + s.l.Lock() + defer s.l.Unlock() + var updates chan *agent.ServiceUpdate if bringCurrent { - go func() { - s.serMutex.Lock() - defer s.serMutex.Unlock() - for _, service := range s.services { - ch <- &agent.ServiceUpdate{ - Name: service.Name, - Addr: service.Addr, - Online: true, - Attrs: service.Attrs, - Created: service.Created, - } + updates = make(chan *agent.ServiceUpdate, len(s.services)) + for _, service := range s.services { + updates <- &agent.ServiceUpdate{ + Name: service.Name, + Addr: service.Addr, + Online: true, + Attrs: service.Attrs, + Created: service.Created, } - }() + } + } else { + updates = make(chan *agent.ServiceUpdate) } + s.watches[updates] = fireOnce + return updates } func (s *ServiceSet) Unwatch(ch chan *agent.ServiceUpdate) { - s.watchMutex.Lock() - defer s.watchMutex.Unlock() + s.l.Lock() + defer s.l.Unlock() + close(ch) delete(s.watches, ch) } -func (s *ServiceSet) Wait() (*agent.ServiceUpdate, error) { - updateCh := make(chan *agent.ServiceUpdate, 1024) // buffer because of Watch bringCurrent race bug - s.Watch(updateCh, true) - defer s.Unwatch(updateCh) - select { - case update := <-updateCh: - return update, nil - case <-time.After(time.Duration(WaitTimeoutSecs) * time.Second): - return nil, errors.New("discover: wait timeout exceeded") - } -} - func (s *ServiceSet) Close() error { return s.call.CloseStream() } type Client struct { - client *rpcplus.Client - heartbeats map[string]chan struct{} - hbMutex sync.Mutex + l sync.Mutex + client *rpcplus.Client + heartbeats map[string]chan struct{} + expandedAddrs map[string]string + names map[string]string } func NewClient() (*Client, error) { @@ -245,12 +277,14 @@ func NewClient() (*Client, error) { func NewClientUsingAddress(addr string) (*Client, error) { client, err := rpcplus.DialHTTP("tcp", addr) return &Client{ - client: client, - heartbeats: make(map[string]chan struct{}), + client: client, + heartbeats: make(map[string]chan struct{}), + expandedAddrs: make(map[string]string), + names: make(map[string]string), }, err } -func (c *Client) ServiceSet(name string) (*ServiceSet, error) { +func (c *Client) NewServiceSet(name string) (*ServiceSet, error) { updates := make(chan *agent.ServiceUpdate) call := c.client.StreamGo("Agent.Subscribe", &agent.Args{ Name: name, @@ -260,24 +294,71 @@ func (c *Client) ServiceSet(name string) (*ServiceSet, error) { return set, nil } -func (c *Client) Services(name string) ([]*Service, error) { - set, err := c.ServiceSet(name) +func (c *Client) Services(name string, timeout time.Duration) ([]*Service, error) { + set, err := c.NewServiceSet(name) if err != nil { return nil, err } - _, err = set.Wait() - if err != nil { - return nil, err + defer set.Close() + select { + case <-set.Watch(true, true): + return set.Services(), nil + case <-time.After(timeout): + return nil, errors.New("discover: timeout exceeded") } - set.Close() - return set.Services(), nil - } func (c *Client) Register(name, addr string) error { return c.RegisterWithAttributes(name, addr, nil) } +func (c *Client) RegisterWithSet(name, addr string, attributes map[string]string) (*ServiceSet, error) { + err := c.RegisterWithAttributes(name, addr, attributes) + if err != nil { + return nil, err + } + set, err := c.NewServiceSet(name) + if err != nil { + c.Unregister(name, addr) + return nil, err + } + set.l.Lock() + c.l.Lock() + set.SelfAddr = c.expandedAddrs[addr] + c.l.Unlock() + set.l.Unlock() + updates := set.Watch(true, false) + for update := range updates { + if update.Addr == set.SelfAddr { + set.Unwatch(updates) + break + } + } + set.l.Lock() + set.self = set.services[set.SelfAddr] + delete(set.services, set.SelfAddr) + set.l.Unlock() + return set, nil +} + +func (c *Client) RegisterAndStandby(name, addr string, attributes map[string]string) (chan *Service, error) { + set, err := c.RegisterWithSet(name, addr, attributes) + if err != nil { + return nil, err + } + standbyCh := make(chan *Service) + go func() { + for leader := range set.Leaders() { + if leader.Addr == set.SelfAddr { + set.Close() + standbyCh <- leader + return + } + } + }() + return standbyCh, nil +} + func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string]string) error { args := &agent.Args{ Name: name, @@ -290,9 +371,11 @@ func (c *Client) RegisterWithAttributes(name, addr string, attributes map[string return errors.New("discover: register failed: " + err.Error()) } done := make(chan struct{}) - c.hbMutex.Lock() + c.l.Lock() c.heartbeats[args.Addr] = done - c.hbMutex.Unlock() + c.expandedAddrs[args.Addr] = ret + c.names[args.Addr] = name + c.l.Unlock() go func() { ticker := time.NewTicker(agent.HeartbeatIntervalSecs * time.Second) // TODO: add jitter defer ticker.Stop() @@ -317,13 +400,108 @@ func (c *Client) Unregister(name, addr string) error { Name: name, Addr: addr, } - c.hbMutex.Lock() + c.l.Lock() close(c.heartbeats[args.Addr]) delete(c.heartbeats, args.Addr) - c.hbMutex.Unlock() + c.l.Unlock() err := c.client.Call("Agent.Unregister", args, &struct{}{}) if err != nil { return errors.New("discover: unregister failed: " + err.Error()) } return nil } + +func (c *Client) UnregisterAll() error { + c.l.Lock() + addrs := make([]string, 0, len(c.heartbeats)) + names := make([]string, 0, len(c.heartbeats)) + for addr, _ := range c.heartbeats { + addrs = append(addrs, addr) + names = append(names, c.names[addr]) + } + c.l.Unlock() + for i := range addrs { + err := c.Unregister(names[i], addrs[i]) + if err != nil { + return err + } + } + return nil +} + +var defaultClient *Client +var defaultEnsureLock = &sync.Mutex{} + +func Connect(addr string) (err error) { + if addr == "" { + defaultClient, err = NewClient() + return + } + defaultClient, err = NewClientUsingAddress(addr) + return +} + +func ensureDefaultConnected() error { + defaultEnsureLock.Lock() + defer defaultEnsureLock.Unlock() + if defaultClient == nil { + return Connect("") + } + return nil +} + +func NewServiceSet(name string) (*ServiceSet, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.NewServiceSet(name) +} + +func Services(name string, timeout time.Duration) ([]*Service, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.Services(name, timeout) +} + +func Register(name, addr string) error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.Register(name, addr) +} + +func RegisterWithSet(name, addr string, attributes map[string]string) (*ServiceSet, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.RegisterWithSet(name, addr, attributes) +} + +func RegisterAndStandby(name, addr string, attributes map[string]string) (chan *Service, error) { + if err := ensureDefaultConnected(); err != nil { + return nil, err + } + return defaultClient.RegisterAndStandby(name, addr, attributes) +} + +func RegisterWithAttributes(name, addr string, attributes map[string]string) error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.RegisterWithAttributes(name, addr, attributes) +} + +func Unregister(name, addr string) error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.Unregister(name, addr) +} + +func UnregisterAll() error { + if err := ensureDefaultConnected(); err != nil { + return err + } + return defaultClient.UnregisterAll() +} diff --git a/client_test.go b/client_test.go index 450d935ac9..fb152bbd76 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,8 @@ package discoverd import ( + "bufio" + "log" "math/rand" "os" "os/exec" @@ -51,9 +53,18 @@ func runDiscoverdServer() func() { doneCh := make(chan struct{}) go func() { cmd := exec.Command("discoverd") + stderr, _ := cmd.StderrPipe() if err := cmd.Start(); err != nil { panic(err) } + if os.Getenv("DEBUG") != "" { + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + log.Println("discoverd:", scanner.Text()) + } + }() + } cmdDone := make(chan error) go func() { cmdDone <- cmd.Wait() @@ -76,115 +87,155 @@ func runDiscoverdServer() func() { } } -func TestClient(t *testing.T) { +func setup(t *testing.T) (*Client, func()) { killEtcd := runEtcdServer() - defer killEtcd() killDiscoverd := runDiscoverdServer() - defer killDiscoverd() - client, err := NewClient() if err != nil { t.Fatal(err) } - serviceName := "testService" - - // Test Register and ServiceSet with attributes - - err = client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) + return client, func() { + killDiscoverd() + killEtcd() } - err = client.Register(serviceName, ":2222") +} + +func assert(err error, t *testing.T) error { if err != nil { - t.Fatal("Registering service failed", err.Error()) + t.Fatal("Unexpected error:", err.Error()) } - set, _ := client.ServiceSet(serviceName) + return err +} + +func TestBasicRegisterAndServiceSet(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "basicTest" + + assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) + assert(client.Register(serviceName, ":2222"), t) + + set, err := client.NewServiceSet(serviceName) + assert(err, t) + if len(set.Services()) < 2 { t.Fatal("Registered services not online") } - err = client.Unregister(serviceName, ":2222") - if err != nil { - t.Fatal("Unregistering service failed", err.Error()) - } + assert(client.Unregister(serviceName, ":2222"), t) if len(set.Services()) != 1 { t.Fatal("Only 1 registered service should be left") } + if set.Services()[0].Attrs["foo"] != "bar" { t.Fatal("Attribute not set on service as 'bar'") } - // Test Re-register + assert(set.Close(), t) +} - err = client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "baz"}) - if err != nil { - t.Fatal("Re-registering service failed", err.Error()) - } +func TestNewAttributes(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "attributeTest" + + set, err := client.NewServiceSet(serviceName) + assert(err, t) + + assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "bar"}), t) + assert(client.RegisterWithAttributes(serviceName, ":1111", map[string]string{"foo": "baz"}), t) + + <-set.Watch(true, true) if set.Services()[0].Attrs["foo"] != "baz" { t.Fatal("Attribute not set on re-registered service as 'baz'") } - err = client.RegisterWithAttributes(serviceName, ":2222", map[string]string{"foo": "qux", "id": "2"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } + assert(set.Close(), t) +} + +func TestFiltering(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "filterTest" + + set, err := client.NewServiceSet(serviceName) + assert(err, t) - // Test Filter + assert(client.Register(serviceName, ":1111"), t) + assert(client.RegisterWithAttributes(serviceName, ":2222", map[string]string{"foo": "qux", "id": "2"}), t) set.Filter(map[string]string{"foo": "qux"}) if len(set.Services()) > 1 { t.Fatal("Filter not limiting online services in set") } - err = client.RegisterWithAttributes(serviceName, ":3333", map[string]string{"foo": "qux", "id": "3"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } + assert(client.RegisterWithAttributes(serviceName, ":3333", map[string]string{"foo": "qux", "id": "3"}), t) if len(set.Services()) < 2 { t.Fatal("Filter not letting new matching services in set") } - err = client.RegisterWithAttributes(serviceName, ":4444", map[string]string{"foo": "baz"}) - if err != nil { - t.Fatal("Registering service failed", err.Error()) - } + assert(client.RegisterWithAttributes(serviceName, ":4444", map[string]string{"foo": "baz"}), t) if len(set.Services()) > 2 { t.Fatal("Filter not limiting new unmatching services from set") } - // Test Select + assert(set.Close(), t) +} + +func TestSelecting(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "selectTest" + + set, err := client.NewServiceSet(serviceName) + assert(err, t) + + assert(client.Register(serviceName, ":1111"), t) + assert(client.RegisterWithAttributes(serviceName, ":2222", map[string]string{"foo": "qux", "id": "2"}), t) + assert(client.RegisterWithAttributes(serviceName, ":3333", map[string]string{"foo": "qux", "id": "3"}), t) if len(set.Select(map[string]string{"id": "3"})) != 1 { t.Fatal("Select not returning proper services") } - // Test Close + assert(set.Close(), t) +} - err = set.Close() - if err != nil { - t.Fatal("Unable to close:", err) - } +func TestServices(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() - // Test client.Services + serviceName := "servicesTest" - services, err := client.Services(serviceName) - if err != nil { - t.Fatal("Unable to get services:", err) - } - if len(services) != 4 { + assert(client.Register(serviceName, ":1111"), t) + assert(client.Register(serviceName, ":2222"), t) + + services, err := client.Services(serviceName, 1) + assert(err, t) + if len(services) != 2 { t.Fatal("Not all registered services were returned:", services) } +} - // Test Watch with bringCurrent +func TestWatch(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() - set, _ = client.ServiceSet(serviceName) - updates := make(chan *agent.ServiceUpdate) - set.Watch(updates, true) - err = client.Register(serviceName, ":5555") - if err != nil { - t.Fatal("Registering service failed", err) - } - for i := 0; i < 5; i++ { + serviceName := "watchTest" + + assert(client.Register(serviceName, ":1111"), t) + assert(client.Register(serviceName, ":2222"), t) + + set, err := client.NewServiceSet(serviceName) + assert(err, t) + + updates := set.Watch(true, false) + assert(client.Register(serviceName, ":3333"), t) + for i := 0; i < 3; i++ { var update *agent.ServiceUpdate select { case update = <-updates: @@ -199,77 +250,231 @@ func TestClient(t *testing.T) { } } + assert(set.Close(), t) } func TestNoServices(t *testing.T) { - killEtcd := runEtcdServer() - defer killEtcd() - killDiscoverd := runDiscoverdServer() - defer killDiscoverd() + client, cleanup := setup(t) + defer cleanup() - client, err := NewClient() - if err != nil { - t.Fatal(err) - } + set, err := client.NewServiceSet("nonexistent") + assert(err, t) - set, _ := client.ServiceSet("nonexistent") if len(set.Services()) != 0 { t.Fatal("There should be no services") } + + assert(set.Close(), t) } -func TestServiceAgeAndLeader(t *testing.T) { - killEtcd := runEtcdServer() - defer killEtcd() - killDiscoverd := runDiscoverdServer() - defer killDiscoverd() +func TestRegisterWithSet(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() - client, err := NewClient() - if err != nil { - t.Fatal(err) + serviceName := "registerWithSetTest" + + assert(client.Register(serviceName, ":1111"), t) + + set, err := client.RegisterWithSet(serviceName, ":2222", nil) + assert(err, t) + + if len(set.Services()) != 1 { + t.Fatal("There should only be one other service") + } + if set.Services()[0].Addr != ":1111" { + t.Fatal("Set contains the wrong service") } - serviceName := "ageService" - err = client.Register(serviceName, ":1111") - if err != nil { - t.Fatal("Registering service failed", err.Error()) + assert(set.Close(), t) + + services, err := client.Services(serviceName, 1) + assert(err, t) + if len(services) != 2 { + t.Fatal("Not all registered services were returned:", services) } - services, _ := client.Services(serviceName) - if len(services) < 1 { - t.Fatal("Registered service not online") +} + +func TestServiceAge(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "ageTest" + + checkOldest := func(addr string) { + services, err := client.Services(serviceName, 1) + assert(err, t) + if services[0].Addr != addr { + t.Fatal("Oldest service is not first in Services() slice") + } } - if services[0].Created < 1 { - t.Fatal("Service has no age") + + assert(client.Register(serviceName, ":1111"), t) + checkOldest(":1111") + assert(client.Register(serviceName, ":2222"), t) + checkOldest(":1111") + assert(client.Register(serviceName, ":3333"), t) + checkOldest(":1111") + assert(client.Register(serviceName, ":4444"), t) + checkOldest(":1111") + assert(client.Unregister(serviceName, ":1111"), t) + checkOldest(":2222") + +} + +func TestLeaderChannel(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "leadersTest" + + assert(client.Register(serviceName, ":1111"), t) + + set, err := client.NewServiceSet(serviceName) + assert(err, t) + + leader := make(chan *Service, 3) + + go func() { + leaders := set.Leaders() + for { + leader <- <-leaders + } + }() + + assert(client.Register(serviceName, ":2222"), t) + + if (<-leader).Addr != ":1111" { + t.Fatal("Incorrect leader") } - err = client.Register(serviceName, ":2222") - if err != nil { - t.Fatal("Registering service failed", err.Error()) + assert(client.Register(serviceName, ":3333"), t) + assert(client.Unregister(serviceName, ":1111"), t) + + if (<-leader).Addr != ":2222" { + t.Fatal("Incorrect leader", leader) } - services, _ = client.Services(serviceName) - if len(services) < 2 { - t.Fatal("Registered services not online") + + assert(client.Unregister(serviceName, ":2222"), t) + + if (<-leader).Addr != ":3333" { + t.Fatal("Incorrect leader") } - if services[0].Port == "1111" { - if services[0].Created >= services[1].Created { - t.Fatal("Older service does not have smaller Created value") - } - } else { - if services[1].Created >= services[0].Created { - t.Fatal("Older service does not have smaller Created value") + + assert(set.Close(), t) +} + +func TestRegisterWithSetLeaderSelf(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "registerWithSetLeaderSelfTest" + + assert(client.Register(serviceName, ":1111"), t) + + set, err := client.RegisterWithSet(serviceName, ":2222", nil) + assert(err, t) + + leader := make(chan *Service, 2) + + go func() { + leaders := set.Leaders() + for { + leader <- <-leaders } + }() + + assert(client.Register(serviceName, ":3333"), t) + + if (<-leader).Addr != ":1111" { + t.Fatal("Incorrect leader") } - err = client.Register(serviceName, ":3333") - if err != nil { - t.Fatal("Registering service failed", err.Error()) + assert(client.Unregister(serviceName, ":1111"), t) + + if (<-leader).Addr != set.SelfAddr { + t.Fatal("Incorrect leader", leader) } - set, _ := client.ServiceSet(serviceName) - if len(set.Services()) < 3 { - t.Fatal("Registered services not online") + + assert(set.Close(), t) + +} + +func TestRegisterAndStandby(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "registerAndStandbyTest" + + assert(client.Register(serviceName, ":1111"), t) + + standbyCh, err := client.RegisterAndStandby(serviceName, ":2222", nil) + assert(err, t) + + assert(client.Register(serviceName, ":3333"), t) + assert(client.Unregister(serviceName, ":3333"), t) + assert(client.Unregister(serviceName, ":1111"), t) + + leader := <-standbyCh + if leader.Addr != ":2222" { + t.Fatal("Incorrect leader", leader) } - if set.Leader().Port != "1111" { - t.Fatal("Incorrect leader") + +} + +func TestUnregisterAll(t *testing.T) { + client, cleanup := setup(t) + defer cleanup() + + serviceName := "unregisterAllTest" + + assert(client.Register(serviceName, ":1111"), t) + assert(client.Register(serviceName, ":2222"), t) + assert(client.Register(serviceName, ":3333"), t) + + services, err := client.Services(serviceName, 1) + assert(err, t) + if len(services) != 3 { + t.Fatal("Wrong number of services") } + assert(client.UnregisterAll(), t) + + set, err := client.NewServiceSet(serviceName) + assert(err, t) + + if len(set.Services()) != 0 { + t.Fatal("There should be no services") + } + + assert(set.Close(), t) + +} + +func TestDefaulClient(t *testing.T) { + _, cleanup := setup(t) + defer cleanup() + + serviceName := "defaultClientTest" + + assert(Register(serviceName, ":1111"), t) + assert(Register(serviceName, ":2222"), t) + assert(Register(serviceName, ":3333"), t) + + services, err := Services(serviceName, 1) + assert(err, t) + if len(services) != 3 { + t.Fatal("Wrong number of services") + } + + assert(UnregisterAll(), t) + + set, err := NewServiceSet(serviceName) + assert(err, t) + + if len(set.Services()) != 0 { + t.Fatal("There should be no services") + } + + assert(set.Close(), t) + }