diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index b77df47db41..b1646864f47 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -192,6 +192,12 @@ func defaultClientOptions(region string) []option.ClientOption { } } +func streamClientOptions(region string) []option.ClientOption { + // To ensure most users don't hit the limit of 100 streams per connection, if + // they have a high number of topic partitions. + return append(defaultClientOptions(region), option.WithGRPCConnectionPool(8)) +} + // NewAdminClient creates a new gapic AdminClient for a region. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { options := append(defaultClientOptions(region), opts...) @@ -199,17 +205,17 @@ func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOpt } func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) { - options := append(defaultClientOptions(region), opts...) + options := append(streamClientOptions(region), opts...) return vkit.NewPublisherClient(ctx, options...) } func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) { - options := append(defaultClientOptions(region), opts...) + options := append(streamClientOptions(region), opts...) return vkit.NewSubscriberClient(ctx, options...) } func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) { - options := append(defaultClientOptions(region), opts...) + options := append(streamClientOptions(region), opts...) return vkit.NewCursorClient(ctx, options...) }