Skip to content

Commit

Permalink
fix(pubsublite): ensure api clients are closed when startup fails (#4239
Browse files Browse the repository at this point in the history
)

- Ensure that NewSubscriber and NewPublisher constructors close all created API clients if they partially fail.
- Added an apiClientService base class to handle closing clients when startup fails, as well as after shutdown.
  • Loading branch information
tmdiep committed Jun 10, 2021
1 parent 16d4b72 commit 55025a1
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 46 deletions.
18 changes: 8 additions & 10 deletions pubsublite/internal/wire/publisher.go
Expand Up @@ -279,7 +279,6 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
// count, but not decreasing.
type routingPublisher struct {
// Immutable after creation.
clients apiClients
msgRouterFactory *messageRouterFactory
pubFactory *singlePartitionPublisherFactory
partitionWatcher *partitionCountWatcher
Expand All @@ -288,12 +287,12 @@ type routingPublisher struct {
msgRouter messageRouter
publishers []*singlePartitionPublisher

compositeService
apiClientService
}

func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
pub := &routingPublisher{
clients: allClients,
apiClientService: apiClientService{clients: allClients},
msgRouterFactory: msgRouterFactory,
pubFactory: pubFactory,
}
Expand Down Expand Up @@ -359,12 +358,6 @@ func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePart
return rp.publishers[partition], nil
}

func (rp *routingPublisher) WaitStopped() error {
err := rp.compositeService.WaitStopped()
rp.clients.Close()
return err
}

// Publisher is the client interface exported from this package for publishing
// messages.
type Publisher interface {
Expand All @@ -385,15 +378,20 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa
if err := validatePublishSettings(settings); err != nil {
return nil, err
}

var allClients apiClients
pubClient, err := newPublisherClient(ctx, region, opts...)
if err != nil {
return nil, err
}
allClients = append(allClients, pubClient)

adminClient, err := NewAdminClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients := apiClients{pubClient, adminClient}
allClients = append(allClients, adminClient)

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
Expand Down
15 changes: 0 additions & 15 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -165,21 +165,6 @@ func defaultClientOptions(region string) []option.ClientOption {
}
}

type apiClient interface {
Close() error
}

type apiClients []apiClient

func (ac apiClients) Close() (retErr error) {
for _, c := range ac {
if err := c.Close(); retErr == nil {
retErr = err
}
}
return
}

// NewAdminClient creates a new gapic AdminClient for a region.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
options := append(defaultClientOptions(region), opts...)
Expand Down
36 changes: 36 additions & 0 deletions pubsublite/internal/wire/service.go
Expand Up @@ -342,3 +342,39 @@ func removeFromSlice(services []service, removeIdx int) []service {
services[lastIdx] = nil
return services[:lastIdx]
}

type apiClient interface {
Close() error
}

type apiClients []apiClient

func (ac apiClients) Close() (retErr error) {
for _, c := range ac {
if err := c.Close(); retErr == nil {
retErr = err
}
}
return
}

// A compositeService that handles closing API clients on shutdown.
type apiClientService struct {
clients apiClients

compositeService
}

func (acs *apiClientService) WaitStarted() error {
err := acs.compositeService.WaitStarted()
if err != nil {
acs.WaitStopped()
}
return err
}

func (acs *apiClientService) WaitStopped() error {
err := acs.compositeService.WaitStopped()
acs.clients.Close()
return err
}
34 changes: 13 additions & 21 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -402,15 +402,14 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
// partitions.
type multiPartitionSubscriber struct {
// Immutable after creation.
clients apiClients
subscribers []*singlePartitionSubscriber

compositeService
apiClientService
}

func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := &multiPartitionSubscriber{
clients: allClients,
apiClientService: apiClientService{clients: allClients},
}
ms.init()

Expand All @@ -433,33 +432,26 @@ func (ms *multiPartitionSubscriber) Terminate() {
}
}

func (ms *multiPartitionSubscriber) WaitStopped() error {
err := ms.compositeService.WaitStopped()
ms.clients.Close()
return err
}

// assigningSubscriber uses the Pub/Sub Lite partition assignment service to
// listen to its assigned partition numbers and dynamically add/remove
// singlePartitionSubscribers.
type assigningSubscriber struct {
// Immutable after creation.
clients apiClients
subFactory *singlePartitionSubscriberFactory
assigner *assigner

// Fields below must be guarded with mu.
// Subscribers keyed by partition number. Updated as assignments change.
subscribers map[int]*singlePartitionSubscriber

compositeService
apiClientService
}

func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
as := &assigningSubscriber{
clients: allClients,
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
apiClientService: apiClientService{clients: allClients},
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
}
as.init()

Expand Down Expand Up @@ -515,12 +507,6 @@ func (as *assigningSubscriber) Terminate() {
}
}

func (as *assigningSubscriber) WaitStopped() error {
err := as.compositeService.WaitStopped()
as.clients.Close()
return err
}

// Subscriber is the client interface exported from this package for receiving
// messages.
type Subscriber interface {
Expand All @@ -539,15 +525,20 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
if err := validateReceiveSettings(settings); err != nil {
return nil, err
}

var allClients apiClients
subClient, err := newSubscriberClient(ctx, region, opts...)
if err != nil {
return nil, err
}
allClients = append(allClients, subClient)

cursorClient, err := newCursorClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients := apiClients{subClient, cursorClient}
allClients = append(allClients, cursorClient)

subFactory := &singlePartitionSubscriberFactory{
ctx: ctx,
Expand All @@ -563,6 +554,7 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
}
partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients = append(allClients, partitionClient)
Expand Down

0 comments on commit 55025a1

Please sign in to comment.