From b7ce742db1acdd18b5a597ebb2a2111953c0942a Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 20 Jul 2021 11:38:02 +1000 Subject: [PATCH] fix(pubsublite): set a default grpc connection pool size of 8 (#4462) To ensure most users don't hit the limit of 100 streams per connection, if they have a high number of topic partitions. --- pubsublite/internal/wire/rpc.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index b77df47db41..b1646864f47 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -192,6 +192,12 @@ func defaultClientOptions(region string) []option.ClientOption { } } +func streamClientOptions(region string) []option.ClientOption { + // To ensure most users don't hit the limit of 100 streams per connection, if + // they have a high number of topic partitions. + return append(defaultClientOptions(region), option.WithGRPCConnectionPool(8)) +} + // NewAdminClient creates a new gapic AdminClient for a region. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { options := append(defaultClientOptions(region), opts...) @@ -199,17 +205,17 @@ func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOpt } func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) { - options := append(defaultClientOptions(region), opts...) + options := append(streamClientOptions(region), opts...) return vkit.NewPublisherClient(ctx, options...) } func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) { - options := append(defaultClientOptions(region), opts...) + options := append(streamClientOptions(region), opts...) return vkit.NewSubscriberClient(ctx, options...) } func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) { - options := append(defaultClientOptions(region), opts...) + options := append(streamClientOptions(region), opts...) return vkit.NewCursorClient(ctx, options...) }