diff --git a/discover/backend_etcd.go b/discover/backend_etcd.go index 899aebe3ed..a7cf8b3408 100644 --- a/discover/backend_etcd.go +++ b/discover/backend_etcd.go @@ -39,6 +39,7 @@ func (b *EtcdBackend) Subscribe(name string) (UpdateStream, error) { stream.ch <- update } } + stream.ch <- &ServiceUpdate{} for u := range watch { if update := b.responseToUpdate(u); update != nil { stream.ch <- update diff --git a/discover/backend_etcd_test.go b/discover/backend_etcd_test.go index 33e7a8b662..eccc2ad3e2 100644 --- a/discover/backend_etcd_test.go +++ b/discover/backend_etcd_test.go @@ -69,8 +69,11 @@ func TestEtcdBackend_Subscribe(t *testing.T) { backend.Register("test_subscribe", "10.0.0.4", nil) defer backend.Unregister("test_subscribe", "10.0.0.4") - for i := 0; i < 4; i++ { + for i := 0; i < 5; i++ { update := <-updates.Chan() + if update.Addr == "" && update.Name == "" { + continue // skip the update that signals "up to current" event + } if update.Online != true { t.Fatal("Unexpected offline service update: ", update, i) } diff --git a/discover/client.go b/discover/client.go index 69a19218db..bdc685fd9b 100644 --- a/discover/client.go +++ b/discover/client.go @@ -29,9 +29,18 @@ type ServiceSet struct { lisMutex sync.Mutex } -func (s *ServiceSet) Bind(updates chan *ServiceUpdate) { +func (s *ServiceSet) bind(updates chan *ServiceUpdate) chan bool { + // current is an event when enough service updates have been + // received to bring us to "current" state (when subscribed) + current := make(chan bool) go func() { + isCurrent := false for update := range updates { + if update.Addr == "" && update.Name == "" && !isCurrent { + current <- true + isCurrent = true + continue + } // TODO: apply filters s.serMutex.Lock() if _, exists := s.services[update.Addr]; !exists { @@ -55,6 +64,7 @@ func (s *ServiceSet) Bind(updates chan *ServiceUpdate) { } } }() + return current } func (s *ServiceSet) Online() []*Service { @@ -149,7 +159,7 @@ func (c *DiscoverClient) Services(name string) *ServiceSet { filters: make(map[string]string), listeners: make(map[chan *ServiceUpdate]struct{}), } - set.Bind(updates) + <-set.bind(updates) return set } diff --git a/discover/client_test.go b/discover/client_test.go index cdb339bab0..8ce96ea9af 100644 --- a/discover/client_test.go +++ b/discover/client_test.go @@ -27,10 +27,6 @@ func TestClient(t *testing.T) { t.Fatal("Registering service failed", err.Error()) } set := client.Services(serviceName) - ch := make(chan *ServiceUpdate) - set.Subscribe(ch) - <-ch - <-ch if len(set.Online()) < 2 { t.Fatal("Registered services not online") }