Skip to content

Commit

Permalink
refactor(pubsublite): change slices to maps in compositeService (#4243)
Browse files Browse the repository at this point in the history
This simplifies lookups and removals.
  • Loading branch information
tmdiep committed Jun 15, 2021
1 parent e1304f4 commit a3249e1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 51 deletions.
77 changes: 26 additions & 51 deletions pubsublite/internal/wire/service.go
Expand Up @@ -25,17 +25,17 @@ type serviceStatus int

const (
// Service has not been started.
serviceUninitialized serviceStatus = 0
serviceUninitialized serviceStatus = iota
// Service is starting up.
serviceStarting serviceStatus = 1
serviceStarting
// Service is active and accepting new data. Note that the underlying stream
// may be reconnecting due to retryable errors.
serviceActive serviceStatus = 2
serviceActive
// Service is gracefully shutting down by flushing all pending data. No new
// data is accepted.
serviceTerminating serviceStatus = 3
serviceTerminating
// Service has terminated. No new data is accepted.
serviceTerminated serviceStatus = 4
serviceTerminated
)

// serviceHandle is used to compare pointers to service instances.
Expand Down Expand Up @@ -168,8 +168,10 @@ type compositeService struct {
waitStarted chan struct{}
waitTerminated chan struct{}

dependencies []service
removed []service
// Current dependencies.
dependencies map[serviceHandle]service
// Removed dependencies that are in the process of terminating.
removed map[serviceHandle]service

abstractService
}
Expand All @@ -178,6 +180,8 @@ type compositeService struct {
func (cs *compositeService) init() {
cs.waitStarted = make(chan struct{})
cs.waitTerminated = make(chan struct{})
cs.dependencies = make(map[serviceHandle]service)
cs.removed = make(map[serviceHandle]service)
}

// Start up dependencies.
Expand Down Expand Up @@ -224,7 +228,7 @@ func (cs *compositeService) unsafeAddServices(services ...service) error {
}

s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange)
cs.dependencies = append(cs.dependencies, s)
cs.dependencies[s.Handle()] = s
if cs.status > serviceUninitialized {
s.Start()
}
Expand All @@ -233,19 +237,15 @@ func (cs *compositeService) unsafeAddServices(services ...service) error {
}

func (cs *compositeService) unsafeRemoveService(remove service) {
removeIdx := -1
for i, s := range cs.dependencies {
if s.Handle() == remove.Handle() {
// Move from the `dependencies` to the `removed` list.
cs.removed = append(cs.removed, s)
removeIdx = i
if s.Status() < serviceTerminating {
s.Stop()
}
break
}
if _, present := cs.dependencies[remove.Handle()]; !present {
return
}
delete(cs.dependencies, remove.Handle())
// The service will be completely removed after it has terminated.
cs.removed[remove.Handle()] = remove
if remove.Status() < serviceTerminating {
remove.Stop()
}
cs.dependencies = removeFromSlice(cs.dependencies, removeIdx)
}

func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
Expand Down Expand Up @@ -277,29 +277,18 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s
cs.mu.Lock()
defer cs.mu.Unlock()

removeIdx := -1
for i, s := range cs.removed {
if s.Handle() == handle {
if status == serviceTerminated {
s.RemoveStatusChangeReceiver(cs.Handle())
removeIdx = i
}
break
if removedService, present := cs.removed[handle]; present {
if status == serviceTerminated {
removedService.RemoveStatusChangeReceiver(cs.Handle())
delete(cs.removed, handle)
}
}
cs.removed = removeFromSlice(cs.removed, removeIdx)

// Note: we cannot rely on the service not being in the removed list above to
// Note: we cannot rely on the service not being in the `removed` map to
// determine whether it is an active dependency. The notification may be for a
// service that is no longer in cs.removed or cs.dependencies, because status
// changes are notified asynchronously and may be received out of order.
isDependency := false
for _, s := range cs.dependencies {
if s.Handle() == handle {
isDependency = true
break
}
}
_, isDependency := cs.dependencies[handle]

// If a single service terminates, stop them all, but allow the others to
// flush pending data. Ignore removed services that are stopping.
Expand Down Expand Up @@ -329,20 +318,6 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s
}
}

func removeFromSlice(services []service, removeIdx int) []service {
lastIdx := len(services) - 1
if removeIdx < 0 || removeIdx > lastIdx {
return services
}

// Swap with last element, erase last element and truncate the slice.
if removeIdx != lastIdx {
services[removeIdx] = services[lastIdx]
}
services[lastIdx] = nil
return services[:lastIdx]
}

type apiClient interface {
Close() error
}
Expand Down
10 changes: 10 additions & 0 deletions pubsublite/internal/wire/service_test.go
Expand Up @@ -412,6 +412,13 @@ func TestCompositeServiceRemoveService(t *testing.T) {
})

t.Run("Remove service", func(t *testing.T) {
if got, want := parent.DependenciesLen(), 2; got != want {
t.Errorf("compositeService.dependencies: got len %d, want %d", got, want)
}
if got, want := parent.RemovedLen(), 0; got != want {
t.Errorf("compositeService.removed: got len %d, want %d", got, want)
}

// Removing child1 should stop it, but leave everything else active.
parent.RemoveService(child1)

Expand All @@ -432,6 +439,9 @@ func TestCompositeServiceRemoveService(t *testing.T) {
child1.receiver.VerifyStatus(t, serviceTerminated)
child2.receiver.VerifyNoStatusChanges(t)
parent.receiver.VerifyNoStatusChanges(t)
if got, want := parent.Status(), serviceActive; got != want {
t.Errorf("compositeService.Status() got %v, want %v", got, want)
}
})

t.Run("Terminating", func(t *testing.T) {
Expand Down

0 comments on commit a3249e1

Please sign in to comment.