Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): close clients after publisher and subscriber have terminated #3512

Merged
merged 7 commits into from Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion pubsublite/admin_test.go
Expand Up @@ -27,7 +27,7 @@ import (
)

func newTestAdminClient(t *testing.T) *AdminClient {
admin, err := NewAdminClient(context.Background(), "us-central1", testClientOpts...)
admin, err := NewAdminClient(context.Background(), "us-central1", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -85,6 +85,7 @@ func TestAdminTopicCRUD(t *testing.T) {
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

if gotConfig, err := admin.CreateTopic(ctx, topicConfig); err != nil {
t.Errorf("CreateTopic() got err: %v", err)
Expand Down Expand Up @@ -172,6 +173,7 @@ func TestAdminListTopics(t *testing.T) {
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotTopicConfigs []*TopicConfig
topicIt := admin.Topics(ctx, locationPath)
Expand Down Expand Up @@ -227,6 +229,7 @@ func TestAdminListTopicSubscriptions(t *testing.T) {
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotSubscriptions []string
subsPathIt := admin.TopicSubscriptions(ctx, topicPath)
Expand Down Expand Up @@ -290,6 +293,7 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig); err != nil {
t.Errorf("CreateSubscription() got err: %v", err)
Expand Down Expand Up @@ -362,6 +366,7 @@ func TestAdminListSubscriptions(t *testing.T) {
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotSubscriptionConfigs []*SubscriptionConfig
subscriptionIt := admin.Subscriptions(ctx, locationPath)
Expand Down
18 changes: 4 additions & 14 deletions pubsublite/internal/test/mock.go
Expand Up @@ -66,23 +66,13 @@ func NewServer() (*Server, error) {
return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
}

// NewServerWithConn creates a new mock Pub/Sub Lite server along with client
// options to connect to it.
func NewServerWithConn() (*Server, []option.ClientOption) {
testServer, err := NewServer()
// ClientConn creates a client connection to the gRPC test server.
func (s *Server) ClientConn() option.ClientOption {
conn, err := grpc.Dial(s.gRPCServer.Addr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
conn, err := grpc.Dial(testServer.Addr(), grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
return testServer, []option.ClientOption{option.WithGRPCConn(conn)}
}

// Addr returns the address that the server is listening on.
func (s *Server) Addr() string {
return s.gRPCServer.Addr
return option.WithGRPCConn(conn)
}

// Close shuts down the server and releases all resources.
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/assigner_test.go
Expand Up @@ -75,7 +75,7 @@ type testAssigner struct {

func newTestAssigner(t *testing.T, subscription string) *testAssigner {
ctx := context.Background()
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...)
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand All @@ -89,7 +89,7 @@ func newTestAssigner(t *testing.T, subscription string) *testAssigner {
t.Fatal(err)
}
ta.asn = asn
ta.initAndStart(t, ta.asn, "Assigner")
ta.initAndStart(t, ta.asn, "Assigner", assignmentClient)
return ta
}

Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -30,15 +30,15 @@ type testCommitter struct {

func newTestCommitter(t *testing.T, subscription subscriptionPartition, acks *ackTracker) *testCommitter {
ctx := context.Background()
cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...)
cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}

tc := &testCommitter{
cmt: newCommitter(ctx, cursorClient, testReceiveSettings(), subscription, acks, true),
}
tc.initAndStart(t, tc.cmt, "Committer")
tc.initAndStart(t, tc.cmt, "Committer", cursorClient)
return tc
}

Expand Down
12 changes: 7 additions & 5 deletions pubsublite/internal/wire/main_test.go
Expand Up @@ -15,25 +15,27 @@ package wire

import (
"flag"
"log"
"os"
"testing"

"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/api/option"
)

var (
// Initialized in TestMain.
mockServer test.MockServer
testClientOpts []option.ClientOption
testServer *test.Server
mockServer test.MockServer
)

func TestMain(m *testing.M) {
flag.Parse()

testServer, clientOpts := test.NewServerWithConn()
var err error
if testServer, err = test.NewServer(); err != nil {
log.Fatal(err)
}
mockServer = testServer.LiteServer
testClientOpts = clientOpts

exit := m.Run()
testServer.Close()
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/partition_count_test.go
Expand Up @@ -47,15 +47,15 @@ func (tw *testPartitionCountWatcher) UpdatePartitionCount() {

func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings PublishSettings) *testPartitionCountWatcher {
ctx := context.Background()
adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...)
adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
tw := &testPartitionCountWatcher{
t: t,
}
tw.watcher = newPartitionCountWatcher(ctx, adminClient, testPublishSettings(), topicPath, tw.onCountChanged)
tw.initAndStart(t, tw.watcher, "PartitionCountWatcher")
tw.initAndStart(t, tw.watcher, "PartitionCountWatcher", adminClient)
return tw
}

Expand Down
13 changes: 11 additions & 2 deletions pubsublite/internal/wire/publisher.go
Expand Up @@ -279,6 +279,7 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
// count, but not decreasing.
type routingPublisher struct {
// Immutable after creation.
clients apiClients
msgRouterFactory *messageRouterFactory
pubFactory *singlePartitionPublisherFactory
partitionWatcher *partitionCountWatcher
Expand All @@ -290,8 +291,9 @@ type routingPublisher struct {
compositeService
}

func newRoutingPublisher(adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
pub := &routingPublisher{
clients: allClients,
msgRouterFactory: msgRouterFactory,
pubFactory: pubFactory,
}
Expand Down Expand Up @@ -357,6 +359,12 @@ func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePart
return rp.publishers[partition], nil
}

func (rp *routingPublisher) WaitStopped() error {
err := rp.compositeService.WaitStopped()
rp.clients.Close()
return err
}

// Publisher is the client interface exported from this package for publishing
// messages.
type Publisher interface {
Expand Down Expand Up @@ -385,6 +393,7 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa
if err != nil {
return nil, err
}
allClients := apiClients{pubClient, adminClient}

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
Expand All @@ -393,5 +402,5 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa
settings: settings,
topicPath: topicPath,
}
return newRoutingPublisher(adminClient, msgRouterFactory, pubFactory), nil
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}
11 changes: 6 additions & 5 deletions pubsublite/internal/wire/publisher_test.go
Expand Up @@ -47,7 +47,7 @@ type testPartitionPublisher struct {

func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher {
ctx := context.Background()
pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...)
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand All @@ -61,7 +61,7 @@ func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, setting
tp := &testPartitionPublisher{
pub: pubFactory.New(topic.Partition),
}
tp.initAndStart(t, tp.pub, "Publisher")
tp.initAndStart(t, tp.pub, "Publisher", pubClient)
return tp
}

Expand Down Expand Up @@ -506,14 +506,15 @@ type testRoutingPublisher struct {

func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher {
ctx := context.Background()
pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...)
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...)
adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
allClients := apiClients{pubClient, adminClient}

source := &test.FakeSource{Ret: fakeSourceVal}
msgRouterFactory := newMessageRouterFactory(rand.New(source))
Expand All @@ -523,7 +524,7 @@ func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSet
settings: settings,
topicPath: topicPath,
}
pub := newRoutingPublisher(adminClient, msgRouterFactory, pubFactory)
pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory)
pub.Start()
return &testRoutingPublisher{t: t, pub: pub}
}
Expand Down
15 changes: 15 additions & 0 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -138,6 +138,21 @@ func defaultClientOptions(region string) []option.ClientOption {
}
}

type apiClient interface {
Close() error
}

type apiClients []apiClient

func (ac apiClients) Close() (retErr error) {
for _, c := range ac {
if err := c.Close(); retErr == nil {
retErr = err
}
}
return
}

// NewAdminClient creates a new gapic AdminClient for a region.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
options := append(defaultClientOptions(region), opts...)
Expand Down
6 changes: 5 additions & 1 deletion pubsublite/internal/wire/service_util_test.go
Expand Up @@ -33,14 +33,16 @@ type serviceTestProxy struct {
t *testing.T
service service
name string
clients apiClients
started chan struct{}
terminated chan struct{}
}

func (sp *serviceTestProxy) initAndStart(t *testing.T, s service, name string) {
func (sp *serviceTestProxy) initAndStart(t *testing.T, s service, name string, clients ...apiClient) {
sp.t = t
sp.service = s
sp.name = name
sp.clients = clients
sp.started = make(chan struct{})
sp.terminated = make(chan struct{})
s.AddStatusChangeReceiver(nil, sp.onStatusChange)
Expand All @@ -65,6 +67,7 @@ func (sp *serviceTestProxy) StartError() error {
case <-time.After(serviceTestWaitTimeout):
return fmt.Errorf("%s did not start within %v", sp.name, serviceTestWaitTimeout)
case <-sp.terminated:
sp.clients.Close()
return sp.service.Error()
case <-sp.started:
return sp.service.Error()
Expand All @@ -77,6 +80,7 @@ func (sp *serviceTestProxy) FinalError() error {
case <-time.After(serviceTestWaitTimeout):
return fmt.Errorf("%s did not terminate within %v", sp.name, serviceTestWaitTimeout)
case <-sp.terminated:
sp.clients.Close()
return sp.service.Error()
}
}
Expand Down
7 changes: 6 additions & 1 deletion pubsublite/internal/wire/streams_test.go
Expand Up @@ -49,7 +49,7 @@ type testStreamHandler struct {

func newTestStreamHandler(t *testing.T, timeout time.Duration) *testStreamHandler {
ctx := context.Background()
pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...)
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -105,6 +105,11 @@ func (sh *testStreamHandler) initialRequest() (interface{}, initialResponseRequi

func (sh *testStreamHandler) onStreamStatusChange(status streamStatus) {
sh.statuses <- status

// Close connections.
if status == streamTerminated {
sh.pubClient.Close()
}
}

func (sh *testStreamHandler) onResponse(response interface{}) {
Expand Down