diff --git a/pubsublite/internal/wire/service.go b/pubsublite/internal/wire/service.go index 380df2a1c39..2ae68d106a8 100644 --- a/pubsublite/internal/wire/service.go +++ b/pubsublite/internal/wire/service.go @@ -25,17 +25,17 @@ type serviceStatus int const ( // Service has not been started. - serviceUninitialized serviceStatus = 0 + serviceUninitialized serviceStatus = iota // Service is starting up. - serviceStarting serviceStatus = 1 + serviceStarting // Service is active and accepting new data. Note that the underlying stream // may be reconnecting due to retryable errors. - serviceActive serviceStatus = 2 + serviceActive // Service is gracefully shutting down by flushing all pending data. No new // data is accepted. - serviceTerminating serviceStatus = 3 + serviceTerminating // Service has terminated. No new data is accepted. - serviceTerminated serviceStatus = 4 + serviceTerminated ) // serviceHandle is used to compare pointers to service instances. @@ -168,8 +168,10 @@ type compositeService struct { waitStarted chan struct{} waitTerminated chan struct{} - dependencies []service - removed []service + // Current dependencies. + dependencies map[serviceHandle]service + // Removed dependencies that are in the process of terminating. + removed map[serviceHandle]service abstractService } @@ -178,6 +180,8 @@ type compositeService struct { func (cs *compositeService) init() { cs.waitStarted = make(chan struct{}) cs.waitTerminated = make(chan struct{}) + cs.dependencies = make(map[serviceHandle]service) + cs.removed = make(map[serviceHandle]service) } // Start up dependencies. @@ -224,7 +228,7 @@ func (cs *compositeService) unsafeAddServices(services ...service) error { } s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange) - cs.dependencies = append(cs.dependencies, s) + cs.dependencies[s.Handle()] = s if cs.status > serviceUninitialized { s.Start() } @@ -233,19 +237,15 @@ func (cs *compositeService) unsafeAddServices(services ...service) error { } func (cs *compositeService) unsafeRemoveService(remove service) { - removeIdx := -1 - for i, s := range cs.dependencies { - if s.Handle() == remove.Handle() { - // Move from the `dependencies` to the `removed` list. - cs.removed = append(cs.removed, s) - removeIdx = i - if s.Status() < serviceTerminating { - s.Stop() - } - break - } + if _, present := cs.dependencies[remove.Handle()]; !present { + return + } + delete(cs.dependencies, remove.Handle()) + // The service will be completely removed after it has terminated. + cs.removed[remove.Handle()] = remove + if remove.Status() < serviceTerminating { + remove.Stop() } - cs.dependencies = removeFromSlice(cs.dependencies, removeIdx) } func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { @@ -277,29 +277,18 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s cs.mu.Lock() defer cs.mu.Unlock() - removeIdx := -1 - for i, s := range cs.removed { - if s.Handle() == handle { - if status == serviceTerminated { - s.RemoveStatusChangeReceiver(cs.Handle()) - removeIdx = i - } - break + if removedService, present := cs.removed[handle]; present { + if status == serviceTerminated { + removedService.RemoveStatusChangeReceiver(cs.Handle()) + delete(cs.removed, handle) } } - cs.removed = removeFromSlice(cs.removed, removeIdx) - // Note: we cannot rely on the service not being in the removed list above to + // Note: we cannot rely on the service not being in the `removed` map to // determine whether it is an active dependency. The notification may be for a // service that is no longer in cs.removed or cs.dependencies, because status // changes are notified asynchronously and may be received out of order. - isDependency := false - for _, s := range cs.dependencies { - if s.Handle() == handle { - isDependency = true - break - } - } + _, isDependency := cs.dependencies[handle] // If a single service terminates, stop them all, but allow the others to // flush pending data. Ignore removed services that are stopping. @@ -329,20 +318,6 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s } } -func removeFromSlice(services []service, removeIdx int) []service { - lastIdx := len(services) - 1 - if removeIdx < 0 || removeIdx > lastIdx { - return services - } - - // Swap with last element, erase last element and truncate the slice. - if removeIdx != lastIdx { - services[removeIdx] = services[lastIdx] - } - services[lastIdx] = nil - return services[:lastIdx] -} - type apiClient interface { Close() error } diff --git a/pubsublite/internal/wire/service_test.go b/pubsublite/internal/wire/service_test.go index 0dd9b9be4fd..c6910296950 100644 --- a/pubsublite/internal/wire/service_test.go +++ b/pubsublite/internal/wire/service_test.go @@ -412,6 +412,13 @@ func TestCompositeServiceRemoveService(t *testing.T) { }) t.Run("Remove service", func(t *testing.T) { + if got, want := parent.DependenciesLen(), 2; got != want { + t.Errorf("compositeService.dependencies: got len %d, want %d", got, want) + } + if got, want := parent.RemovedLen(), 0; got != want { + t.Errorf("compositeService.removed: got len %d, want %d", got, want) + } + // Removing child1 should stop it, but leave everything else active. parent.RemoveService(child1) @@ -432,6 +439,9 @@ func TestCompositeServiceRemoveService(t *testing.T) { child1.receiver.VerifyStatus(t, serviceTerminated) child2.receiver.VerifyNoStatusChanges(t) parent.receiver.VerifyNoStatusChanges(t) + if got, want := parent.Status(), serviceActive; got != want { + t.Errorf("compositeService.Status() got %v, want %v", got, want) + } }) t.Run("Terminating", func(t *testing.T) {