Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
discoverd: fixes expected behavior of client.Service() as described in
Browse files Browse the repository at this point in the history
  • Loading branch information
progrium committed Oct 7, 2013
1 parent e5471cf commit e1e1977
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 7 deletions.
1 change: 1 addition & 0 deletions discover/backend_etcd.go
Expand Up @@ -39,6 +39,7 @@ func (b *EtcdBackend) Subscribe(name string) (UpdateStream, error) {
stream.ch <- update
}
}
stream.ch <- &ServiceUpdate{}
for u := range watch {
if update := b.responseToUpdate(u); update != nil {
stream.ch <- update
Expand Down
5 changes: 4 additions & 1 deletion discover/backend_etcd_test.go
Expand Up @@ -69,8 +69,11 @@ func TestEtcdBackend_Subscribe(t *testing.T) {
backend.Register("test_subscribe", "10.0.0.4", nil)
defer backend.Unregister("test_subscribe", "10.0.0.4")

for i := 0; i < 4; i++ {
for i := 0; i < 5; i++ {
update := <-updates.Chan()
if update.Addr == "" && update.Name == "" {
continue // skip the update that signals "up to current" event
}
if update.Online != true {
t.Fatal("Unexpected offline service update: ", update, i)
}
Expand Down
14 changes: 12 additions & 2 deletions discover/client.go
Expand Up @@ -29,9 +29,18 @@ type ServiceSet struct {
lisMutex sync.Mutex
}

func (s *ServiceSet) Bind(updates chan *ServiceUpdate) {
func (s *ServiceSet) bind(updates chan *ServiceUpdate) chan bool {
// current is an event when enough service updates have been
// received to bring us to "current" state (when subscribed)
current := make(chan bool)
go func() {
isCurrent := false
for update := range updates {
if update.Addr == "" && update.Name == "" && !isCurrent {
current <- true
isCurrent = true
continue
}
// TODO: apply filters
s.serMutex.Lock()
if _, exists := s.services[update.Addr]; !exists {
Expand All @@ -55,6 +64,7 @@ func (s *ServiceSet) Bind(updates chan *ServiceUpdate) {
}
}
}()
return current
}

func (s *ServiceSet) Online() []*Service {
Expand Down Expand Up @@ -149,7 +159,7 @@ func (c *DiscoverClient) Services(name string) *ServiceSet {
filters: make(map[string]string),
listeners: make(map[chan *ServiceUpdate]struct{}),
}
set.Bind(updates)
<-set.bind(updates)
return set
}

Expand Down
4 changes: 0 additions & 4 deletions discover/client_test.go
Expand Up @@ -27,10 +27,6 @@ func TestClient(t *testing.T) {
t.Fatal("Registering service failed", err.Error())
}
set := client.Services(serviceName)
ch := make(chan *ServiceUpdate)
set.Subscribe(ch)
<-ch
<-ch
if len(set.Online()) < 2 {
t.Fatal("Registered services not online")
}
Expand Down

0 comments on commit e1e1977

Please sign in to comment.