From 55025a1c6abe0ef4e57dd31347265aab3b78bdf8 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 10 Jun 2021 10:18:25 +1000 Subject: [PATCH] fix(pubsublite): ensure api clients are closed when startup fails (#4239) - Ensure that NewSubscriber and NewPublisher constructors close all created API clients if they partially fail. - Added an apiClientService base class to handle closing clients when startup fails, as well as after shutdown. --- pubsublite/internal/wire/publisher.go | 18 ++++++------- pubsublite/internal/wire/rpc.go | 15 ----------- pubsublite/internal/wire/service.go | 36 ++++++++++++++++++++++++++ pubsublite/internal/wire/subscriber.go | 34 ++++++++++-------------- 4 files changed, 57 insertions(+), 46 deletions(-) diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index f48e114ae17..740be64911b 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -279,7 +279,6 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() { // count, but not decreasing. type routingPublisher struct { // Immutable after creation. - clients apiClients msgRouterFactory *messageRouterFactory pubFactory *singlePartitionPublisherFactory partitionWatcher *partitionCountWatcher @@ -288,12 +287,12 @@ type routingPublisher struct { msgRouter messageRouter publishers []*singlePartitionPublisher - compositeService + apiClientService } func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher { pub := &routingPublisher{ - clients: allClients, + apiClientService: apiClientService{clients: allClients}, msgRouterFactory: msgRouterFactory, pubFactory: pubFactory, } @@ -359,12 +358,6 @@ func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePart return rp.publishers[partition], nil } -func (rp *routingPublisher) WaitStopped() error { - err := rp.compositeService.WaitStopped() - rp.clients.Close() - return err -} - // Publisher is the client interface exported from this package for publishing // messages. type Publisher interface { @@ -385,15 +378,20 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa if err := validatePublishSettings(settings); err != nil { return nil, err } + + var allClients apiClients pubClient, err := newPublisherClient(ctx, region, opts...) if err != nil { return nil, err } + allClients = append(allClients, pubClient) + adminClient, err := NewAdminClient(ctx, region, opts...) if err != nil { + allClients.Close() return nil, err } - allClients := apiClients{pubClient, adminClient} + allClients = append(allClients, adminClient) msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano()))) pubFactory := &singlePartitionPublisherFactory{ diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index 6519a4db225..0aa282057df 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -165,21 +165,6 @@ func defaultClientOptions(region string) []option.ClientOption { } } -type apiClient interface { - Close() error -} - -type apiClients []apiClient - -func (ac apiClients) Close() (retErr error) { - for _, c := range ac { - if err := c.Close(); retErr == nil { - retErr = err - } - } - return -} - // NewAdminClient creates a new gapic AdminClient for a region. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { options := append(defaultClientOptions(region), opts...) diff --git a/pubsublite/internal/wire/service.go b/pubsublite/internal/wire/service.go index 565f39276d4..380df2a1c39 100644 --- a/pubsublite/internal/wire/service.go +++ b/pubsublite/internal/wire/service.go @@ -342,3 +342,39 @@ func removeFromSlice(services []service, removeIdx int) []service { services[lastIdx] = nil return services[:lastIdx] } + +type apiClient interface { + Close() error +} + +type apiClients []apiClient + +func (ac apiClients) Close() (retErr error) { + for _, c := range ac { + if err := c.Close(); retErr == nil { + retErr = err + } + } + return +} + +// A compositeService that handles closing API clients on shutdown. +type apiClientService struct { + clients apiClients + + compositeService +} + +func (acs *apiClientService) WaitStarted() error { + err := acs.compositeService.WaitStarted() + if err != nil { + acs.WaitStopped() + } + return err +} + +func (acs *apiClientService) WaitStopped() error { + err := acs.compositeService.WaitStopped() + acs.clients.Close() + return err +} diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 7dcb4f67f08..d7b793688df 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -402,15 +402,14 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu // partitions. type multiPartitionSubscriber struct { // Immutable after creation. - clients apiClients subscribers []*singlePartitionSubscriber - compositeService + apiClientService } func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber { ms := &multiPartitionSubscriber{ - clients: allClients, + apiClientService: apiClientService{clients: allClients}, } ms.init() @@ -433,18 +432,11 @@ func (ms *multiPartitionSubscriber) Terminate() { } } -func (ms *multiPartitionSubscriber) WaitStopped() error { - err := ms.compositeService.WaitStopped() - ms.clients.Close() - return err -} - // assigningSubscriber uses the Pub/Sub Lite partition assignment service to // listen to its assigned partition numbers and dynamically add/remove // singlePartitionSubscribers. type assigningSubscriber struct { // Immutable after creation. - clients apiClients subFactory *singlePartitionSubscriberFactory assigner *assigner @@ -452,14 +444,14 @@ type assigningSubscriber struct { // Subscribers keyed by partition number. Updated as assignments change. subscribers map[int]*singlePartitionSubscriber - compositeService + apiClientService } func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) { as := &assigningSubscriber{ - clients: allClients, - subFactory: subFactory, - subscribers: make(map[int]*singlePartitionSubscriber), + apiClientService: apiClientService{clients: allClients}, + subFactory: subFactory, + subscribers: make(map[int]*singlePartitionSubscriber), } as.init() @@ -515,12 +507,6 @@ func (as *assigningSubscriber) Terminate() { } } -func (as *assigningSubscriber) WaitStopped() error { - err := as.compositeService.WaitStopped() - as.clients.Close() - return err -} - // Subscriber is the client interface exported from this package for receiving // messages. type Subscriber interface { @@ -539,15 +525,20 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa if err := validateReceiveSettings(settings); err != nil { return nil, err } + + var allClients apiClients subClient, err := newSubscriberClient(ctx, region, opts...) if err != nil { return nil, err } + allClients = append(allClients, subClient) + cursorClient, err := newCursorClient(ctx, region, opts...) if err != nil { + allClients.Close() return nil, err } - allClients := apiClients{subClient, cursorClient} + allClients = append(allClients, cursorClient) subFactory := &singlePartitionSubscriberFactory{ ctx: ctx, @@ -563,6 +554,7 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa } partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...) if err != nil { + allClients.Close() return nil, err } allClients = append(allClients, partitionClient)