Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): close clients after publisher and subscriber have terminated #3512

Merged
merged 7 commits into from Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsublite/admin_test.go
Expand Up @@ -27,7 +27,7 @@ import (
)

func newTestAdminClient(t *testing.T) *AdminClient {
admin, err := NewAdminClient(context.Background(), "us-central1", testClientOpts...)
admin, err := NewAdminClient(context.Background(), "us-central1", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 4 additions & 14 deletions pubsublite/internal/test/mock.go
Expand Up @@ -66,23 +66,13 @@ func NewServer() (*Server, error) {
return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
}

// NewServerWithConn creates a new mock Pub/Sub Lite server along with client
// options to connect to it.
func NewServerWithConn() (*Server, []option.ClientOption) {
testServer, err := NewServer()
// ClientConn creates a client connection to the gRPC test server.
func (s *Server) ClientConn() option.ClientOption {
conn, err := grpc.Dial(s.gRPCServer.Addr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
conn, err := grpc.Dial(testServer.Addr(), grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
return testServer, []option.ClientOption{option.WithGRPCConn(conn)}
}

// Addr returns the address that the server is listening on.
func (s *Server) Addr() string {
return s.gRPCServer.Addr
return option.WithGRPCConn(conn)
}

// Close shuts down the server and releases all resources.
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/assigner_test.go
Expand Up @@ -75,7 +75,7 @@ type testAssigner struct {

func newTestAssigner(t *testing.T, subscription string) *testAssigner {
ctx := context.Background()
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...)
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/committer_test.go
Expand Up @@ -30,7 +30,7 @@ type testCommitter struct {

func newTestCommitter(t *testing.T, subscription subscriptionPartition, acks *ackTracker) *testCommitter {
ctx := context.Background()
cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...)
cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 7 additions & 5 deletions pubsublite/internal/wire/main_test.go
Expand Up @@ -15,25 +15,27 @@ package wire

import (
"flag"
"log"
"os"
"testing"

"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/api/option"
)

var (
// Initialized in TestMain.
mockServer test.MockServer
testClientOpts []option.ClientOption
testServer *test.Server
mockServer test.MockServer
)

func TestMain(m *testing.M) {
flag.Parse()

testServer, clientOpts := test.NewServerWithConn()
var err error
if testServer, err = test.NewServer(); err != nil {
log.Fatal(err)
}
mockServer = testServer.LiteServer
testClientOpts = clientOpts

exit := m.Run()
testServer.Close()
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/partition_count_test.go
Expand Up @@ -47,7 +47,7 @@ func (tw *testPartitionCountWatcher) UpdatePartitionCount() {

func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings PublishSettings) *testPartitionCountWatcher {
ctx := context.Background()
adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...)
adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down
13 changes: 11 additions & 2 deletions pubsublite/internal/wire/publisher.go
Expand Up @@ -279,6 +279,7 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
// count, but not decreasing.
type routingPublisher struct {
// Immutable after creation.
clients apiClients
msgRouterFactory *messageRouterFactory
pubFactory *singlePartitionPublisherFactory
partitionWatcher *partitionCountWatcher
Expand All @@ -290,8 +291,9 @@ type routingPublisher struct {
compositeService
}

func newRoutingPublisher(adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
pub := &routingPublisher{
clients: allClients,
msgRouterFactory: msgRouterFactory,
pubFactory: pubFactory,
}
Expand Down Expand Up @@ -357,6 +359,12 @@ func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePart
return rp.publishers[partition], nil
}

func (rp *routingPublisher) WaitStopped() error {
err := rp.compositeService.WaitStopped()
rp.clients.Close()
return err
}

// Publisher is the client interface exported from this package for publishing
// messages.
type Publisher interface {
Expand Down Expand Up @@ -385,6 +393,7 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa
if err != nil {
return nil, err
}
allClients := apiClients{pubClient, adminClient}

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
Expand All @@ -393,5 +402,5 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa
settings: settings,
topicPath: topicPath,
}
return newRoutingPublisher(adminClient, msgRouterFactory, pubFactory), nil
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}
9 changes: 5 additions & 4 deletions pubsublite/internal/wire/publisher_test.go
Expand Up @@ -47,7 +47,7 @@ type testPartitionPublisher struct {

func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher {
ctx := context.Background()
pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...)
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -506,14 +506,15 @@ type testRoutingPublisher struct {

func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher {
ctx := context.Background()
pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...)
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...)
adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
allClients := apiClients{pubClient, adminClient}

source := &test.FakeSource{Ret: fakeSourceVal}
msgRouterFactory := newMessageRouterFactory(rand.New(source))
Expand All @@ -523,7 +524,7 @@ func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSet
settings: settings,
topicPath: topicPath,
}
pub := newRoutingPublisher(adminClient, msgRouterFactory, pubFactory)
pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory)
pub.Start()
return &testRoutingPublisher{t: t, pub: pub}
}
Expand Down
15 changes: 15 additions & 0 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -138,6 +138,21 @@ func defaultClientOptions(region string) []option.ClientOption {
}
}

type apiClient interface {
Close() error
}

type apiClients []apiClient

func (ac apiClients) Close() (retErr error) {
for _, c := range ac {
if err := c.Close(); retErr == nil {
retErr = err
}
}
return
}

// NewAdminClient creates a new gapic AdminClient for a region.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
options := append(defaultClientOptions(region), opts...)
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/streams_test.go
Expand Up @@ -49,7 +49,7 @@ type testStreamHandler struct {

func newTestStreamHandler(t *testing.T, timeout time.Duration) *testStreamHandler {
ctx := context.Background()
pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...)
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 13 additions & 5 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -379,13 +379,17 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
// Immutable after creation.
clients apiClients
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
subscribers []*singlePartitionSubscriber

compositeService
}

func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := new(multiPartitionSubscriber)
func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := &multiPartitionSubscriber{
clients: allClients,
}
ms.init()

for _, partition := range subFactory.settings.Partitions {
Expand All @@ -412,6 +416,7 @@ func (ms *multiPartitionSubscriber) Terminate() {
// singlePartitionSubscribers.
type assigningSubscriber struct {
// Immutable after creation.
clients apiClients
subFactory *singlePartitionSubscriberFactory
assigner *assigner

Expand All @@ -422,8 +427,9 @@ type assigningSubscriber struct {
compositeService
}

func newAssigningSubscriber(assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
as := &assigningSubscriber{
clients: allClients,
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
}
Expand Down Expand Up @@ -503,6 +509,7 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
if err != nil {
return nil, err
}
allClients := apiClients{subClient, cursorClient}

subFactory := &singlePartitionSubscriberFactory{
ctx: ctx,
Expand All @@ -514,11 +521,12 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
}

if len(settings.Partitions) > 0 {
return newMultiPartitionSubscriber(subFactory), nil
return newMultiPartitionSubscriber(allClients, subFactory), nil
}
partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...)
if err != nil {
return nil, err
}
return newAssigningSubscriber(partitionClient, uuid.NewRandom, subFactory)
allClients = append(allClients, partitionClient)
return newAssigningSubscriber(allClients, partitionClient, uuid.NewRandom, subFactory)
}
22 changes: 12 additions & 10 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -187,7 +187,7 @@ type testSubscribeStream struct {

func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream {
ctx := context.Background()
subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...)
subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -435,11 +435,11 @@ func TestSubscribeStreamFlowControlOverflow(t *testing.T) {

func newTestSinglePartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscription subscriptionPartition) *singlePartitionSubscriber {
ctx := context.Background()
subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...)
subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...)
cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -622,14 +622,15 @@ func TestSinglePartitionSubscriberStopDuringReceive(t *testing.T) {

func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber {
ctx := context.Background()
subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...)
subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...)
cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
allClients := apiClients{subClient, cursorClient}

f := &singlePartitionSubscriberFactory{
ctx: ctx,
Expand All @@ -641,7 +642,7 @@ func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverF
disableTasks: true, // Background tasks disabled to control event order
}
f.settings.Partitions = partitions
sub := newMultiPartitionSubscriber(f)
sub := newMultiPartitionSubscriber(allClients, f)
sub.Start()
return sub
}
Expand Down Expand Up @@ -769,18 +770,19 @@ func (as *assigningSubscriber) FlushCommits() {

func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string) *assigningSubscriber {
ctx := context.Background()
subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...)
subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...)
cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...)
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
allClients := apiClients{subClient, cursorClient, assignmentClient}

f := &singlePartitionSubscriberFactory{
ctx: ctx,
Expand All @@ -791,7 +793,7 @@ func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc,
receiver: receiverFunc,
disableTasks: true, // Background tasks disabled to control event order
}
sub, err := newAssigningSubscriber(assignmentClient, fakeGenerateUUID, f)
sub, err := newAssigningSubscriber(allClients, assignmentClient, fakeGenerateUUID, f)
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 7 additions & 5 deletions pubsublite/main_test.go
Expand Up @@ -15,25 +15,27 @@ package pubsublite

import (
"flag"
"log"
"os"
"testing"

"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/api/option"
)

var (
// Initialized in TestMain.
mockServer test.MockServer
testClientOpts []option.ClientOption
testServer *test.Server
mockServer test.MockServer
)

func TestMain(m *testing.M) {
flag.Parse()

testServer, clientOpts := test.NewServerWithConn()
var err error
if testServer, err = test.NewServer(); err != nil {
log.Fatal(err)
}
mockServer = testServer.LiteServer
testClientOpts = clientOpts

exit := m.Run()
testServer.Close()
Expand Down