From 72d2affb957cea7b6a223b108d0fe67c5635b25c Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 20 Jan 2021 10:16:43 +1100 Subject: [PATCH] fix(pubsublite): close clients after publisher and subscriber have terminated (#3512) Close gapic clients in SubscriberClient, PublisherClient and tests in order to close client connections. Create new client connections to the mock server in unit tests. --- pubsublite/admin_test.go | 7 +++- pubsublite/internal/test/mock.go | 18 ++------- pubsublite/internal/wire/assigner_test.go | 4 +- pubsublite/internal/wire/committer_test.go | 4 +- pubsublite/internal/wire/main_test.go | 12 +++--- .../internal/wire/partition_count_test.go | 4 +- pubsublite/internal/wire/publisher.go | 13 ++++++- pubsublite/internal/wire/publisher_test.go | 11 +++--- pubsublite/internal/wire/rpc.go | 15 ++++++++ pubsublite/internal/wire/service_util_test.go | 6 ++- pubsublite/internal/wire/streams_test.go | 7 +++- pubsublite/internal/wire/subscriber.go | 30 ++++++++++++--- pubsublite/internal/wire/subscriber_test.go | 38 ++++++++++++------- pubsublite/main_test.go | 12 +++--- 14 files changed, 123 insertions(+), 58 deletions(-) diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index 7784b9ca5fc..522b8091c5d 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -27,7 +27,7 @@ import ( ) func newTestAdminClient(t *testing.T) *AdminClient { - admin, err := NewAdminClient(context.Background(), "us-central1", testClientOpts...) + admin, err := NewAdminClient(context.Background(), "us-central1", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -85,6 +85,7 @@ func TestAdminTopicCRUD(t *testing.T) { defer mockServer.OnTestEnd() admin := newTestAdminClient(t) + defer admin.Close() if gotConfig, err := admin.CreateTopic(ctx, topicConfig); err != nil { t.Errorf("CreateTopic() got err: %v", err) @@ -172,6 +173,7 @@ func TestAdminListTopics(t *testing.T) { defer mockServer.OnTestEnd() admin := newTestAdminClient(t) + defer admin.Close() var gotTopicConfigs []*TopicConfig topicIt := admin.Topics(ctx, locationPath) @@ -227,6 +229,7 @@ func TestAdminListTopicSubscriptions(t *testing.T) { defer mockServer.OnTestEnd() admin := newTestAdminClient(t) + defer admin.Close() var gotSubscriptions []string subsPathIt := admin.TopicSubscriptions(ctx, topicPath) @@ -290,6 +293,7 @@ func TestAdminSubscriptionCRUD(t *testing.T) { defer mockServer.OnTestEnd() admin := newTestAdminClient(t) + defer admin.Close() if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig); err != nil { t.Errorf("CreateSubscription() got err: %v", err) @@ -362,6 +366,7 @@ func TestAdminListSubscriptions(t *testing.T) { defer mockServer.OnTestEnd() admin := newTestAdminClient(t) + defer admin.Close() var gotSubscriptionConfigs []*SubscriptionConfig subscriptionIt := admin.Subscriptions(ctx, locationPath) diff --git a/pubsublite/internal/test/mock.go b/pubsublite/internal/test/mock.go index 515fbec2668..425cda76c37 100644 --- a/pubsublite/internal/test/mock.go +++ b/pubsublite/internal/test/mock.go @@ -66,23 +66,13 @@ func NewServer() (*Server, error) { return &Server{LiteServer: liteServer, gRPCServer: srv}, nil } -// NewServerWithConn creates a new mock Pub/Sub Lite server along with client -// options to connect to it. -func NewServerWithConn() (*Server, []option.ClientOption) { - testServer, err := NewServer() +// ClientConn creates a client connection to the gRPC test server. +func (s *Server) ClientConn() option.ClientOption { + conn, err := grpc.Dial(s.gRPCServer.Addr, grpc.WithInsecure()) if err != nil { log.Fatal(err) } - conn, err := grpc.Dial(testServer.Addr(), grpc.WithInsecure()) - if err != nil { - log.Fatal(err) - } - return testServer, []option.ClientOption{option.WithGRPCConn(conn)} -} - -// Addr returns the address that the server is listening on. -func (s *Server) Addr() string { - return s.gRPCServer.Addr + return option.WithGRPCConn(conn) } // Close shuts down the server and releases all resources. diff --git a/pubsublite/internal/wire/assigner_test.go b/pubsublite/internal/wire/assigner_test.go index 64e761f1993..4e3c15ba529 100644 --- a/pubsublite/internal/wire/assigner_test.go +++ b/pubsublite/internal/wire/assigner_test.go @@ -75,7 +75,7 @@ type testAssigner struct { func newTestAssigner(t *testing.T, subscription string) *testAssigner { ctx := context.Background() - assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...) + assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -89,7 +89,7 @@ func newTestAssigner(t *testing.T, subscription string) *testAssigner { t.Fatal(err) } ta.asn = asn - ta.initAndStart(t, ta.asn, "Assigner") + ta.initAndStart(t, ta.asn, "Assigner", assignmentClient) return ta } diff --git a/pubsublite/internal/wire/committer_test.go b/pubsublite/internal/wire/committer_test.go index 6ebd4fbaf18..59a0b5d6b7f 100644 --- a/pubsublite/internal/wire/committer_test.go +++ b/pubsublite/internal/wire/committer_test.go @@ -30,7 +30,7 @@ type testCommitter struct { func newTestCommitter(t *testing.T, subscription subscriptionPartition, acks *ackTracker) *testCommitter { ctx := context.Background() - cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...) + cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -38,7 +38,7 @@ func newTestCommitter(t *testing.T, subscription subscriptionPartition, acks *ac tc := &testCommitter{ cmt: newCommitter(ctx, cursorClient, testReceiveSettings(), subscription, acks, true), } - tc.initAndStart(t, tc.cmt, "Committer") + tc.initAndStart(t, tc.cmt, "Committer", cursorClient) return tc } diff --git a/pubsublite/internal/wire/main_test.go b/pubsublite/internal/wire/main_test.go index a9dbc679194..97165a470f0 100644 --- a/pubsublite/internal/wire/main_test.go +++ b/pubsublite/internal/wire/main_test.go @@ -15,25 +15,27 @@ package wire import ( "flag" + "log" "os" "testing" "cloud.google.com/go/pubsublite/internal/test" - "google.golang.org/api/option" ) var ( // Initialized in TestMain. - mockServer test.MockServer - testClientOpts []option.ClientOption + testServer *test.Server + mockServer test.MockServer ) func TestMain(m *testing.M) { flag.Parse() - testServer, clientOpts := test.NewServerWithConn() + var err error + if testServer, err = test.NewServer(); err != nil { + log.Fatal(err) + } mockServer = testServer.LiteServer - testClientOpts = clientOpts exit := m.Run() testServer.Close() diff --git a/pubsublite/internal/wire/partition_count_test.go b/pubsublite/internal/wire/partition_count_test.go index 09177e578f3..05d9558b214 100644 --- a/pubsublite/internal/wire/partition_count_test.go +++ b/pubsublite/internal/wire/partition_count_test.go @@ -47,7 +47,7 @@ func (tw *testPartitionCountWatcher) UpdatePartitionCount() { func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings PublishSettings) *testPartitionCountWatcher { ctx := context.Background() - adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...) + adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -55,7 +55,7 @@ func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings Publi t: t, } tw.watcher = newPartitionCountWatcher(ctx, adminClient, testPublishSettings(), topicPath, tw.onCountChanged) - tw.initAndStart(t, tw.watcher, "PartitionCountWatcher") + tw.initAndStart(t, tw.watcher, "PartitionCountWatcher", adminClient) return tw } diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index 6fcf8f0cf26..19a5a0255c7 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -279,6 +279,7 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() { // count, but not decreasing. type routingPublisher struct { // Immutable after creation. + clients apiClients msgRouterFactory *messageRouterFactory pubFactory *singlePartitionPublisherFactory partitionWatcher *partitionCountWatcher @@ -290,8 +291,9 @@ type routingPublisher struct { compositeService } -func newRoutingPublisher(adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher { +func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher { pub := &routingPublisher{ + clients: allClients, msgRouterFactory: msgRouterFactory, pubFactory: pubFactory, } @@ -357,6 +359,12 @@ func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePart return rp.publishers[partition], nil } +func (rp *routingPublisher) WaitStopped() error { + err := rp.compositeService.WaitStopped() + rp.clients.Close() + return err +} + // Publisher is the client interface exported from this package for publishing // messages. type Publisher interface { @@ -385,6 +393,7 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa if err != nil { return nil, err } + allClients := apiClients{pubClient, adminClient} msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano()))) pubFactory := &singlePartitionPublisherFactory{ @@ -393,5 +402,5 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa settings: settings, topicPath: topicPath, } - return newRoutingPublisher(adminClient, msgRouterFactory, pubFactory), nil + return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil } diff --git a/pubsublite/internal/wire/publisher_test.go b/pubsublite/internal/wire/publisher_test.go index 0b57135747e..1f58d3a0dcc 100644 --- a/pubsublite/internal/wire/publisher_test.go +++ b/pubsublite/internal/wire/publisher_test.go @@ -47,7 +47,7 @@ type testPartitionPublisher struct { func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher { ctx := context.Background() - pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...) + pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -61,7 +61,7 @@ func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, setting tp := &testPartitionPublisher{ pub: pubFactory.New(topic.Partition), } - tp.initAndStart(t, tp.pub, "Publisher") + tp.initAndStart(t, tp.pub, "Publisher", pubClient) return tp } @@ -506,14 +506,15 @@ type testRoutingPublisher struct { func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher { ctx := context.Background() - pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...) + pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } - adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...) + adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } + allClients := apiClients{pubClient, adminClient} source := &test.FakeSource{Ret: fakeSourceVal} msgRouterFactory := newMessageRouterFactory(rand.New(source)) @@ -523,7 +524,7 @@ func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSet settings: settings, topicPath: topicPath, } - pub := newRoutingPublisher(adminClient, msgRouterFactory, pubFactory) + pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory) pub.Start() return &testRoutingPublisher{t: t, pub: pub} } diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index c0bd1a96ab6..fdca3e8b8b1 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -138,6 +138,21 @@ func defaultClientOptions(region string) []option.ClientOption { } } +type apiClient interface { + Close() error +} + +type apiClients []apiClient + +func (ac apiClients) Close() (retErr error) { + for _, c := range ac { + if err := c.Close(); retErr == nil { + retErr = err + } + } + return +} + // NewAdminClient creates a new gapic AdminClient for a region. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { options := append(defaultClientOptions(region), opts...) diff --git a/pubsublite/internal/wire/service_util_test.go b/pubsublite/internal/wire/service_util_test.go index e55d1609ba5..60a138fb2bc 100644 --- a/pubsublite/internal/wire/service_util_test.go +++ b/pubsublite/internal/wire/service_util_test.go @@ -33,14 +33,16 @@ type serviceTestProxy struct { t *testing.T service service name string + clients apiClients started chan struct{} terminated chan struct{} } -func (sp *serviceTestProxy) initAndStart(t *testing.T, s service, name string) { +func (sp *serviceTestProxy) initAndStart(t *testing.T, s service, name string, clients ...apiClient) { sp.t = t sp.service = s sp.name = name + sp.clients = clients sp.started = make(chan struct{}) sp.terminated = make(chan struct{}) s.AddStatusChangeReceiver(nil, sp.onStatusChange) @@ -65,6 +67,7 @@ func (sp *serviceTestProxy) StartError() error { case <-time.After(serviceTestWaitTimeout): return fmt.Errorf("%s did not start within %v", sp.name, serviceTestWaitTimeout) case <-sp.terminated: + sp.clients.Close() return sp.service.Error() case <-sp.started: return sp.service.Error() @@ -77,6 +80,7 @@ func (sp *serviceTestProxy) FinalError() error { case <-time.After(serviceTestWaitTimeout): return fmt.Errorf("%s did not terminate within %v", sp.name, serviceTestWaitTimeout) case <-sp.terminated: + sp.clients.Close() return sp.service.Error() } } diff --git a/pubsublite/internal/wire/streams_test.go b/pubsublite/internal/wire/streams_test.go index 2604e11429f..968fd48f28c 100644 --- a/pubsublite/internal/wire/streams_test.go +++ b/pubsublite/internal/wire/streams_test.go @@ -49,7 +49,7 @@ type testStreamHandler struct { func newTestStreamHandler(t *testing.T, timeout time.Duration) *testStreamHandler { ctx := context.Background() - pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...) + pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -105,6 +105,11 @@ func (sh *testStreamHandler) initialRequest() (interface{}, initialResponseRequi func (sh *testStreamHandler) onStreamStatusChange(status streamStatus) { sh.statuses <- status + + // Close connections. + if status == streamTerminated { + sh.pubClient.Close() + } } func (sh *testStreamHandler) onResponse(response interface{}) { diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 2f0d2b9413e..4da86acb7d2 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -379,13 +379,17 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu // multiPartitionSubscriber receives messages from a fixed set of topic // partitions. type multiPartitionSubscriber struct { + // Immutable after creation. + clients apiClients subscribers []*singlePartitionSubscriber compositeService } -func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber { - ms := new(multiPartitionSubscriber) +func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber { + ms := &multiPartitionSubscriber{ + clients: allClients, + } ms.init() for _, partition := range subFactory.settings.Partitions { @@ -407,11 +411,18 @@ func (ms *multiPartitionSubscriber) Terminate() { } } +func (ms *multiPartitionSubscriber) WaitStopped() error { + err := ms.compositeService.WaitStopped() + ms.clients.Close() + return err +} + // assigningSubscriber uses the Pub/Sub Lite partition assignment service to // listen to its assigned partition numbers and dynamically add/remove // singlePartitionSubscribers. type assigningSubscriber struct { // Immutable after creation. + clients apiClients subFactory *singlePartitionSubscriberFactory assigner *assigner @@ -422,8 +433,9 @@ type assigningSubscriber struct { compositeService } -func newAssigningSubscriber(assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) { +func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) { as := &assigningSubscriber{ + clients: allClients, subFactory: subFactory, subscribers: make(map[int]*singlePartitionSubscriber), } @@ -477,6 +489,12 @@ func (as *assigningSubscriber) Terminate() { } } +func (as *assigningSubscriber) WaitStopped() error { + err := as.compositeService.WaitStopped() + as.clients.Close() + return err +} + // Subscriber is the client interface exported from this package for receiving // messages. type Subscriber interface { @@ -503,6 +521,7 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa if err != nil { return nil, err } + allClients := apiClients{subClient, cursorClient} subFactory := &singlePartitionSubscriberFactory{ ctx: ctx, @@ -514,11 +533,12 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa } if len(settings.Partitions) > 0 { - return newMultiPartitionSubscriber(subFactory), nil + return newMultiPartitionSubscriber(allClients, subFactory), nil } partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...) if err != nil { return nil, err } - return newAssigningSubscriber(partitionClient, uuid.NewRandom, subFactory) + allClients = append(allClients, partitionClient) + return newAssigningSubscriber(allClients, partitionClient, uuid.NewRandom, subFactory) } diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index de5c59ba843..5ebed5c1c82 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -187,7 +187,7 @@ type testSubscribeStream struct { func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream { ctx := context.Background() - subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...) + subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -197,7 +197,7 @@ func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, se t: t, } ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, acks, true) - ts.initAndStart(t, ts.sub, "Subscriber") + ts.initAndStart(t, ts.sub, "Subscriber", subClient) return ts } @@ -433,13 +433,23 @@ func TestSubscribeStreamFlowControlOverflow(t *testing.T) { } } -func newTestSinglePartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscription subscriptionPartition) *singlePartitionSubscriber { +type testSinglePartitionSubscriber singlePartitionSubscriber + +func (t *testSinglePartitionSubscriber) WaitStopped() error { + err := t.compositeService.WaitStopped() + // Close connections. + t.committer.cursorClient.Close() + t.subscriber.subClient.Close() + return err +} + +func newTestSinglePartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscription subscriptionPartition) *testSinglePartitionSubscriber { ctx := context.Background() - subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...) + subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } - cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...) + cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } @@ -455,7 +465,7 @@ func newTestSinglePartitionSubscriber(t *testing.T, receiverFunc MessageReceiver } sub := f.New(subscription.Partition) sub.Start() - return sub + return (*testSinglePartitionSubscriber)(sub) } func TestSinglePartitionSubscriberStartStop(t *testing.T) { @@ -622,14 +632,15 @@ func TestSinglePartitionSubscriberStopDuringReceive(t *testing.T) { func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber { ctx := context.Background() - subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...) + subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } - cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...) + cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } + allClients := apiClients{subClient, cursorClient} f := &singlePartitionSubscriberFactory{ ctx: ctx, @@ -641,7 +652,7 @@ func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverF disableTasks: true, // Background tasks disabled to control event order } f.settings.Partitions = partitions - sub := newMultiPartitionSubscriber(f) + sub := newMultiPartitionSubscriber(allClients, f) sub.Start() return sub } @@ -769,18 +780,19 @@ func (as *assigningSubscriber) FlushCommits() { func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string) *assigningSubscriber { ctx := context.Background() - subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...) + subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } - cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...) + cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } - assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...) + assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn()) if err != nil { t.Fatal(err) } + allClients := apiClients{subClient, cursorClient, assignmentClient} f := &singlePartitionSubscriberFactory{ ctx: ctx, @@ -791,7 +803,7 @@ func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, receiver: receiverFunc, disableTasks: true, // Background tasks disabled to control event order } - sub, err := newAssigningSubscriber(assignmentClient, fakeGenerateUUID, f) + sub, err := newAssigningSubscriber(allClients, assignmentClient, fakeGenerateUUID, f) if err != nil { t.Fatal(err) } diff --git a/pubsublite/main_test.go b/pubsublite/main_test.go index 0f325f8d904..1e0b613c952 100644 --- a/pubsublite/main_test.go +++ b/pubsublite/main_test.go @@ -15,25 +15,27 @@ package pubsublite import ( "flag" + "log" "os" "testing" "cloud.google.com/go/pubsublite/internal/test" - "google.golang.org/api/option" ) var ( // Initialized in TestMain. - mockServer test.MockServer - testClientOpts []option.ClientOption + testServer *test.Server + mockServer test.MockServer ) func TestMain(m *testing.M) { flag.Parse() - testServer, clientOpts := test.NewServerWithConn() + var err error + if testServer, err = test.NewServer(); err != nil { + log.Fatal(err) + } mockServer = testServer.LiteServer - testClientOpts = clientOpts exit := m.Run() testServer.Close()