Skip to content

Commit

Permalink
feat(pubsublite): Abstraction for leaf and composite services (#3143)
Browse files Browse the repository at this point in the history
Pub/Sub Lite publisher and subscriber clients will consist of a hierarchy of services.
  • Loading branch information
tmdiep committed Nov 12, 2020
1 parent 23b6904 commit 869bd24
Show file tree
Hide file tree
Showing 3 changed files with 886 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pubsublite/internal/wire/errors.go
Expand Up @@ -19,4 +19,17 @@ var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

// ErrServiceUninitialized indicates that a service (e.g. publisher or
// subscriber) cannot perform an operation because it is uninitialized.
ErrServiceUninitialized = errors.New("pubsublite: service must be started")

// ErrServiceStarting indicates that a service (e.g. publisher or subscriber)
// cannot perform an operation because it is starting up.
ErrServiceStarting = errors.New("pubsublite: service is starting up")

// ErrServiceStopped indicates that a service (e.g. publisher or subscriber)
// cannot perform an operation because it has stoped or is in the process of
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")
)
343 changes: 343 additions & 0 deletions pubsublite/internal/wire/service.go
@@ -0,0 +1,343 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"sync"
)

// serviceStatus specifies the current status of the service. The order of the
// values reflects the lifecycle of services. Note that some statuses may be
// skipped.
type serviceStatus int

const (
// Service has not been started.
serviceUninitialized serviceStatus = 0
// Service is starting up.
serviceStarting serviceStatus = 1
// Service is active and accepting new data. Note that the underlying stream
// may be reconnecting due to retryable errors.
serviceActive serviceStatus = 2
// Service is gracefully shutting down by flushing all pending data. No new
// data is accepted.
serviceTerminating serviceStatus = 3
// Service has terminated. No new data is accepted.
serviceTerminated serviceStatus = 4
)

// serviceHandle is used to compare pointers to service instances.
type serviceHandle interface{}

// serviceStatusChangeFunc notifies the parent of service status changes.
// `serviceTerminating` and `serviceTerminated` have an associated error. This
// error may be nil if the user called Stop().
type serviceStatusChangeFunc func(serviceHandle, serviceStatus, error)

// service is the interface that must be implemented by services (essentially
// gRPC client stream wrappers, e.g. subscriber, publisher) that can be
// dependencies of a compositeService.
type service interface {
Start()
Stop()

// Methods below are implemented by abstractService.
AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc)
RemoveStatusChangeReceiver(serviceHandle)
Handle() serviceHandle
Error() error
}

// abstractService can be embedded into other structs to provide common
// functionality for managing service status and status change receivers.
type abstractService struct {
mu sync.Mutex
statusChangeReceivers []*statusChangeReceiver
status serviceStatus
// The error that cause the service to terminate.
err error
}

type statusChangeReceiver struct {
handle serviceHandle // For removing the receiver.
onStatusChange serviceStatusChangeFunc
}

func (as *abstractService) AddStatusChangeReceiver(handle serviceHandle, onStatusChange serviceStatusChangeFunc) {
as.mu.Lock()
defer as.mu.Unlock()
as.statusChangeReceivers = append(
as.statusChangeReceivers,
&statusChangeReceiver{handle, onStatusChange})
}

func (as *abstractService) RemoveStatusChangeReceiver(handle serviceHandle) {
as.mu.Lock()
defer as.mu.Unlock()

for i := len(as.statusChangeReceivers) - 1; i >= 0; i-- {
r := as.statusChangeReceivers[i]
if r.handle == handle {
// Swap with last element, erase last element and truncate the slice.
lastIdx := len(as.statusChangeReceivers) - 1
if i != lastIdx {
as.statusChangeReceivers[i] = as.statusChangeReceivers[lastIdx]
}
as.statusChangeReceivers[lastIdx] = nil
as.statusChangeReceivers = as.statusChangeReceivers[:lastIdx]
}
}
}

// Handle identifies this service instance, even when there are multiple layers
// of embedding.
func (as *abstractService) Handle() serviceHandle {
return as
}

func (as *abstractService) Error() error {
as.mu.Lock()
defer as.mu.Unlock()
return as.err
}

func (as *abstractService) Status() serviceStatus {
as.mu.Lock()
defer as.mu.Unlock()
return as.status
}

func (as *abstractService) unsafeCheckServiceStatus() error {
switch {
case as.status == serviceUninitialized:
return ErrServiceUninitialized
case as.status == serviceStarting:
return ErrServiceStarting
case as.status >= serviceTerminating:
return ErrServiceStopped
default:
return nil
}
}

// unsafeUpdateStatus assumes the service is already holding a mutex when
// called, as it often needs to be atomic with other operations.
func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err error) bool {
if as.status >= targetStatus {
// Already at the same or later stage of the service lifecycle.
return false
}

as.status = targetStatus
if as.err == nil {
// Prevent clobbering original error.
as.err = err
}

for _, receiver := range as.statusChangeReceivers {
// Notify in a goroutine to prevent deadlocks if the receiver is holding a
// locked mutex.
go receiver.onStatusChange(as.Handle(), as.status, as.err)
}
return true
}

type serviceHolder struct {
service service
lastStatus serviceStatus
}

// compositeService can be embedded into other structs to manage child services.
// It implements the service interface and can itself be a dependency of another
// compositeService.
//
// If one child service terminates due to a permanent failure, all other child
// services are stopped. Child services can be added and removed dynamically.
type compositeService struct {
// Used to block until all dependencies have started or terminated.
waitStarted chan struct{}
waitTerminated chan struct{}

dependencies []*serviceHolder
removed []*serviceHolder

abstractService
}

// init must be called after creation of the derived struct.
func (cs *compositeService) init() {
cs.waitStarted = make(chan struct{})
cs.waitTerminated = make(chan struct{})
}

// Start up dependencies.
func (cs *compositeService) Start() {
cs.mu.Lock()
defer cs.mu.Unlock()

if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) {
for _, s := range cs.dependencies {
s.service.Start()
}
}
}

// WaitStarted waits for all dependencies to start.
func (cs *compositeService) WaitStarted() error {
<-cs.waitStarted
return cs.Error()
}

// Stop all dependencies.
func (cs *compositeService) Stop() {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.unsafeInitiateShutdown(serviceTerminating, nil)
}

// WaitStopped waits for all dependencies to stop.
func (cs *compositeService) WaitStopped() error {
<-cs.waitTerminated
return cs.Error()
}

func (cs *compositeService) unsafeAddServices(services ...service) error {
if cs.status >= serviceTerminating {
return ErrServiceStopped
}

for _, s := range services {
s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange)
cs.dependencies = append(cs.dependencies, &serviceHolder{service: s})
if cs.status > serviceUninitialized {
s.Start()
}
}
return nil
}

func (cs *compositeService) unsafeRemoveService(service service) {
removeIdx := -1
for i, s := range cs.dependencies {
if s.service.Handle() == service.Handle() {
// Move from the `dependencies` to the `removed` list.
cs.removed = append(cs.removed, s)
removeIdx = i
if s.lastStatus < serviceTerminating {
s.service.Stop()
}
break
}
}
cs.dependencies = removeFromSlice(cs.dependencies, removeIdx)
}

func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
for _, s := range cs.dependencies {
if s.lastStatus < serviceTerminating {
s.service.Stop()
}
}
cs.unsafeUpdateStatus(targetStatus, err)
}

func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) {
previousStatus := cs.status
if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret {
// Note: the waitStarted channel must be closed when the service fails to
// start.
if previousStatus == serviceStarting {
close(cs.waitStarted)
}
if targetStatus == serviceTerminated {
close(cs.waitTerminated)
}
}
return
}

func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status serviceStatus, err error) {
cs.mu.Lock()
defer cs.mu.Unlock()

removeIdx := -1
for i, s := range cs.removed {
if s.service.Handle() == handle {
if status == serviceTerminated {
s.service.RemoveStatusChangeReceiver(cs.Handle())
removeIdx = i
}
break
}
}
if removeIdx >= 0 {
cs.removed = removeFromSlice(cs.removed, removeIdx)
}

// Note: we cannot rely on the service not being in the removed list above to
// determine whether it is an active dependency. The notification may be for a
// service that is no longer in cs.removed or cs.dependencies, because status
// changes are notified asynchronously and may be received out of order.
isDependency := false
for _, s := range cs.dependencies {
if s.service.Handle() == handle {
if status > s.lastStatus {
s.lastStatus = status
}
isDependency = true
break
}
}

// If a single service terminates, stop them all, but allow the others to
// flush pending data. Ignore removed services that are stopping.
shouldTerminate := status >= serviceTerminating && isDependency
numStarted := 0
numTerminated := 0

for _, s := range cs.dependencies {
if shouldTerminate && s.lastStatus < serviceTerminating {
s.service.Stop()
}
if s.lastStatus >= serviceActive {
numStarted++
}
if s.lastStatus == serviceTerminated {
numTerminated++
}
}

switch {
case numTerminated == len(cs.dependencies) && len(cs.removed) == 0:
cs.unsafeUpdateStatus(serviceTerminated, err)
case shouldTerminate:
cs.unsafeUpdateStatus(serviceTerminating, err)
case numStarted == len(cs.dependencies):
cs.unsafeUpdateStatus(serviceActive, err)
}
}

func removeFromSlice(services []*serviceHolder, removeIdx int) []*serviceHolder {
lastIdx := len(services) - 1
if removeIdx < 0 || removeIdx > lastIdx {
return services
}

// Swap with last element, erase last element and truncate the slice.
if removeIdx != lastIdx {
services[removeIdx] = services[lastIdx]
}
services[lastIdx] = nil
return services[:lastIdx]
}

0 comments on commit 869bd24

Please sign in to comment.