Skip to content

Commit

Permalink
refactor(pubsublite): simplify service (#3393)
Browse files Browse the repository at this point in the history
No-op refactoring and additional error checking.
  • Loading branch information
tmdiep committed Dec 22, 2020
1 parent 87b972c commit 404b6c6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 42 deletions.
63 changes: 32 additions & 31 deletions pubsublite/internal/wire/service.go
Expand Up @@ -14,6 +14,7 @@
package wire

import (
"errors"
"sync"
)

Expand Down Expand Up @@ -56,6 +57,7 @@ type service interface {
AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc)
RemoveStatusChangeReceiver(serviceHandle)
Handle() serviceHandle
Status() serviceStatus
Error() error
}

Expand Down Expand Up @@ -153,10 +155,7 @@ func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err er
return true
}

type serviceHolder struct {
service service
lastStatus serviceStatus
}
var errChildServiceStarted = errors.New("pubsublite: dependent service must not be started")

// compositeService can be embedded into other structs to manage child services.
// It implements the service interface and can itself be a dependency of another
Expand All @@ -169,8 +168,8 @@ type compositeService struct {
waitStarted chan struct{}
waitTerminated chan struct{}

dependencies []*serviceHolder
removed []*serviceHolder
dependencies []service
removed []service

abstractService
}
Expand All @@ -188,7 +187,7 @@ func (cs *compositeService) Start() {

if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) {
for _, s := range cs.dependencies {
s.service.Start()
s.Start()
}
}
}
Expand Down Expand Up @@ -218,24 +217,30 @@ func (cs *compositeService) unsafeAddServices(services ...service) error {
}

for _, s := range services {
// Adding dependent services which have already started not currently
// supported. Requires updating logic to handle the compositeService state.
if s.Status() > serviceUninitialized {
return errChildServiceStarted
}

s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange)
cs.dependencies = append(cs.dependencies, &serviceHolder{service: s})
cs.dependencies = append(cs.dependencies, s)
if cs.status > serviceUninitialized {
s.Start()
}
}
return nil
}

func (cs *compositeService) unsafeRemoveService(service service) {
func (cs *compositeService) unsafeRemoveService(remove service) {
removeIdx := -1
for i, s := range cs.dependencies {
if s.service.Handle() == service.Handle() {
if s.Handle() == remove.Handle() {
// Move from the `dependencies` to the `removed` list.
cs.removed = append(cs.removed, s)
removeIdx = i
if s.lastStatus < serviceTerminating {
s.service.Stop()
if s.Status() < serviceTerminating {
s.Stop()
}
break
}
Expand All @@ -244,20 +249,21 @@ func (cs *compositeService) unsafeRemoveService(service service) {
}

func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
for _, s := range cs.dependencies {
if s.lastStatus < serviceTerminating {
s.service.Stop()
if cs.unsafeUpdateStatus(targetStatus, err) {
for _, s := range cs.dependencies {
if s.Status() < serviceTerminating {
s.Stop()
}
}
}
cs.unsafeUpdateStatus(targetStatus, err)
}

func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) {
previousStatus := cs.status
if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret {
// Note: the waitStarted channel must be closed when the service fails to
// start.
if previousStatus == serviceStarting {
if previousStatus < serviceActive && targetStatus >= serviceActive {
close(cs.waitStarted)
}
if targetStatus == serviceTerminated {
Expand All @@ -273,28 +279,23 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s

removeIdx := -1
for i, s := range cs.removed {
if s.service.Handle() == handle {
if s.Handle() == handle {
if status == serviceTerminated {
s.service.RemoveStatusChangeReceiver(cs.Handle())
s.RemoveStatusChangeReceiver(cs.Handle())
removeIdx = i
}
break
}
}
if removeIdx >= 0 {
cs.removed = removeFromSlice(cs.removed, removeIdx)
}
cs.removed = removeFromSlice(cs.removed, removeIdx)

// Note: we cannot rely on the service not being in the removed list above to
// determine whether it is an active dependency. The notification may be for a
// service that is no longer in cs.removed or cs.dependencies, because status
// changes are notified asynchronously and may be received out of order.
isDependency := false
for _, s := range cs.dependencies {
if s.service.Handle() == handle {
if status > s.lastStatus {
s.lastStatus = status
}
if s.Handle() == handle {
isDependency = true
break
}
Expand All @@ -307,13 +308,13 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s
numTerminated := 0

for _, s := range cs.dependencies {
if shouldTerminate && s.lastStatus < serviceTerminating {
s.service.Stop()
if shouldTerminate && s.Status() < serviceTerminating {
s.Stop()
}
if s.lastStatus >= serviceActive {
if s.Status() >= serviceActive {
numStarted++
}
if s.lastStatus == serviceTerminated {
if s.Status() == serviceTerminated {
numTerminated++
}
}
Expand All @@ -328,7 +329,7 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s
}
}

func removeFromSlice(services []*serviceHolder, removeIdx int) []*serviceHolder {
func removeFromSlice(services []service, removeIdx int) []service {
lastIdx := len(services) - 1
if removeIdx < 0 || removeIdx > lastIdx {
return services
Expand Down
57 changes: 46 additions & 11 deletions pubsublite/internal/wire/service_test.go
Expand Up @@ -202,10 +202,10 @@ func newTestCompositeService(name string) *testCompositeService {
return ts
}

func (ts *testCompositeService) AddServices(services ...service) {
func (ts *testCompositeService) AddServices(services ...service) error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.unsafeAddServices(services...)
return ts.unsafeAddServices(services...)
}

func (ts *testCompositeService) RemoveService(service service) {
Expand All @@ -231,7 +231,9 @@ func TestCompositeServiceNormalStop(t *testing.T) {
child2 := newTestService("child2")
child3 := newTestService("child3")
parent := newTestCompositeService("parent")
parent.AddServices(child1, child2)
if err := parent.AddServices(child1, child2); err != nil {
t.Errorf("AddServices() got err: %v", err)
}

t.Run("Starting", func(t *testing.T) {
wantState := serviceUninitialized
Expand All @@ -252,7 +254,9 @@ func TestCompositeServiceNormalStop(t *testing.T) {
if child3.Status() != wantState {
t.Errorf("child3: current service status: got %d, want %d", child3.Status(), wantState)
}
parent.AddServices(child3)
if err := parent.AddServices(child3); err != nil {
t.Errorf("AddServices() got err: %v", err)
}
child3.receiver.VerifyStatus(t, serviceStarting)
})

Expand Down Expand Up @@ -300,7 +304,9 @@ func TestCompositeServiceErrorDuringStartup(t *testing.T) {
child1 := newTestService("child1")
child2 := newTestService("child2")
parent := newTestCompositeService("parent")
parent.AddServices(child1, child2)
if err := parent.AddServices(child1, child2); err != nil {
t.Errorf("AddServices() got err: %v", err)
}

t.Run("Starting", func(t *testing.T) {
parent.Start()
Expand Down Expand Up @@ -334,7 +340,9 @@ func TestCompositeServiceErrorWhileActive(t *testing.T) {
child1 := newTestService("child1")
child2 := newTestService("child2")
parent := newTestCompositeService("parent")
parent.AddServices(child1, child2)
if err := parent.AddServices(child1, child2); err != nil {
t.Errorf("AddServices() got err: %v", err)
}

t.Run("Starting", func(t *testing.T) {
parent.Start()
Expand Down Expand Up @@ -382,7 +390,9 @@ func TestCompositeServiceRemoveService(t *testing.T) {
child1 := newTestService("child1")
child2 := newTestService("child2")
parent := newTestCompositeService("parent")
parent.AddServices(child1, child2)
if err := parent.AddServices(child1, child2); err != nil {
t.Errorf("AddServices() got err: %v", err)
}

t.Run("Starting", func(t *testing.T) {
parent.Start()
Expand Down Expand Up @@ -452,16 +462,21 @@ func TestCompositeServiceTree(t *testing.T) {
leaf1 := newTestService("leaf1")
leaf2 := newTestService("leaf2")
intermediate1 := newTestCompositeService("intermediate1")
intermediate1.AddServices(leaf1, leaf2)
if err := intermediate1.AddServices(leaf1, leaf2); err != nil {
t.Errorf("intermediate1.AddServices() got err: %v", err)
}

leaf3 := newTestService("leaf3")
leaf4 := newTestService("leaf4")
intermediate2 := newTestCompositeService("intermediate2")
intermediate2.AddServices(leaf3, leaf4)
if err := intermediate2.AddServices(leaf3, leaf4); err != nil {
t.Errorf("intermediate2.AddServices() got err: %v", err)
}

root := newTestCompositeService("root")
root.AddServices(intermediate1, intermediate2)

if err := root.AddServices(intermediate1, intermediate2); err != nil {
t.Errorf("root.AddServices() got err: %v", err)
}
wantErr := errors.New("fail")

t.Run("Starting", func(t *testing.T) {
Expand Down Expand Up @@ -528,3 +543,23 @@ func TestCompositeServiceTree(t *testing.T) {
}
})
}

func TestCompositeServiceAddServicesErrors(t *testing.T) {
child1 := newTestService("child1")
parent := newTestCompositeService("parent")
if err := parent.AddServices(child1); err != nil {
t.Errorf("AddServices(child1) got err: %v", err)
}

child2 := newTestService("child2")
child2.Start()
if gotErr, wantErr := parent.AddServices(child2), errChildServiceStarted; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("AddServices(child2) got err: (%v), want err: (%v)", gotErr, wantErr)
}

parent.Stop()
child3 := newTestService("child3")
if gotErr, wantErr := parent.AddServices(child3), ErrServiceStopped; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("AddServices(child3) got err: (%v), want err: (%v)", gotErr, wantErr)
}
}

0 comments on commit 404b6c6

Please sign in to comment.