diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index 64a6665a183..3486729db6d 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -29,11 +29,13 @@ const ( MaxPublishMessageBytes = 1000000 // MaxPublishRequestBytes is the maximum allowed serialized size of a single - // publish request (containing a batch of messages) in bytes. + // publish request (containing a batch of messages) in bytes. Must be lower + // than the gRPC limit of 4 MiB. MaxPublishRequestBytes = 3500000 ) -// PublishSettings control the batching of published messages. +// PublishSettings control the batching of published messages. These settings +// apply per partition. type PublishSettings struct { // Publish a non-empty batch after this delay has passed. Must be > 0. DelayThreshold time.Duration @@ -71,7 +73,7 @@ var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, - Timeout: 60 * time.Second, + Timeout: 10 * time.Minute, // By default set to a high limit that is not likely to occur, but prevents // OOM errors in clients. BufferedByteLimit: 1 << 30, // 1 GiB @@ -101,3 +103,61 @@ func validatePublishSettings(settings PublishSettings) error { } return nil } + +// ReceiveSettings control the receiving of messages. These settings apply +// per partition. +type ReceiveSettings struct { + // MaxOutstandingMessages is the maximum number of unacknowledged messages. + // Must be > 0. + MaxOutstandingMessages int + + // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged + // messages. Must be > 0. + MaxOutstandingBytes int + + // The maximum time that the client will attempt to establish a subscribe + // stream connection to the server. Must be > 0. + // + // The timeout is exceeded, the subscriber will terminate with the last error + // that occurred while trying to reconnect. + Timeout time.Duration + + // The topic partition numbers (zero-indexed) to receive messages from. + // Values must be less than the number of partitions for the topic. If not + // specified, the client will use the partition assignment service to + // determine which partitions it should connect to. + Partitions []int +} + +// DefaultReceiveSettings holds the default values for ReceiveSettings. +var DefaultReceiveSettings = ReceiveSettings{ + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, + Timeout: 10 * time.Minute, +} + +func validateReceiveSettings(settings ReceiveSettings) error { + if settings.MaxOutstandingMessages <= 0 { + return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0") + } + if settings.MaxOutstandingBytes <= 0 { + return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0") + } + if settings.Timeout <= 0 { + return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0") + } + if len(settings.Partitions) > 0 { + var void struct{} + partitionMap := make(map[int]struct{}) + for _, p := range settings.Partitions { + if p < 0 { + return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p) + } + if _, exists := partitionMap[p]; exists { + return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p) + } + partitionMap[p] = void + } + } + return nil +} diff --git a/pubsublite/internal/wire/settings_test.go b/pubsublite/internal/wire/settings_test.go index 175ca6d3aa7..400e79c0602 100644 --- a/pubsublite/internal/wire/settings_test.go +++ b/pubsublite/internal/wire/settings_test.go @@ -21,90 +21,81 @@ import ( func TestValidatePublishSettings(t *testing.T) { for _, tc := range []struct { desc string - // settingsFunc is passed DefaultPublishSettings - settingsFunc func(settings PublishSettings) PublishSettings - wantErr bool + // mutateSettings is passed a copy of DefaultPublishSettings to mutate. + mutateSettings func(settings *PublishSettings) + wantErr bool }{ { - desc: "valid: default", - settingsFunc: func(settings PublishSettings) PublishSettings { - return DefaultPublishSettings - }, - wantErr: false, + desc: "valid: default", + mutateSettings: func(settings *PublishSettings) {}, + wantErr: false, }, { desc: "valid: max", - settingsFunc: func(settings PublishSettings) PublishSettings { - return PublishSettings{ - CountThreshold: MaxPublishRequestCount, - ByteThreshold: MaxPublishRequestBytes, - // These have no max bounds, check large values. - DelayThreshold: 24 * time.Hour, - Timeout: 24 * time.Hour, - BufferedByteLimit: 1e10, - } + mutateSettings: func(settings *PublishSettings) { + settings.CountThreshold = MaxPublishRequestCount + settings.ByteThreshold = MaxPublishRequestBytes + // These have no max bounds, check large values. + settings.DelayThreshold = 24 * time.Hour + settings.Timeout = 24 * time.Hour + settings.BufferedByteLimit = 1e10 }, wantErr: false, }, { desc: "invalid: zero CountThreshold", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.CountThreshold = 0 - return settings }, wantErr: true, }, { desc: "invalid: CountThreshold over MaxPublishRequestCount", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.CountThreshold = MaxPublishRequestCount + 1 - return settings }, wantErr: true, }, { desc: "invalid: ByteThreshold over MaxPublishRequestBytes", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.ByteThreshold = MaxPublishRequestBytes + 1 - return settings }, wantErr: true, }, { desc: "invalid: zero ByteThreshold", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.ByteThreshold = 0 - return settings }, wantErr: true, }, { desc: "invalid: zero DelayThreshold", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.DelayThreshold = time.Duration(0) - return settings }, wantErr: true, }, { desc: "invalid: zero Timeout", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.Timeout = time.Duration(0) - return settings }, wantErr: true, }, { desc: "invalid: zero BufferedByteLimit", - settingsFunc: func(settings PublishSettings) PublishSettings { + mutateSettings: func(settings *PublishSettings) { settings.BufferedByteLimit = 0 - return settings }, wantErr: true, }, } { t.Run(tc.desc, func(t *testing.T) { - settings := tc.settingsFunc(DefaultPublishSettings) + settings := DefaultPublishSettings + tc.mutateSettings(&settings) + gotErr := validatePublishSettings(settings) if (gotErr != nil) != tc.wantErr { t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr) @@ -112,3 +103,67 @@ func TestValidatePublishSettings(t *testing.T) { }) } } + +func TestValidateReceiveSettings(t *testing.T) { + for _, tc := range []struct { + desc string + // mutateSettings is passed a copy of DefaultReceiveSettings to mutate. + mutateSettings func(settings *ReceiveSettings) + wantErr bool + }{ + { + desc: "valid: default", + mutateSettings: func(settings *ReceiveSettings) {}, + wantErr: false, + }, + { + desc: "valid: max", + mutateSettings: func(settings *ReceiveSettings) { + settings.Partitions = []int{5, 3, 9, 1, 4, 0} + // These have no max bounds, check large values. + settings.MaxOutstandingMessages = 100000 + settings.MaxOutstandingBytes = 1e10 + settings.Timeout = 24 * time.Hour + }, + wantErr: false, + }, + { + desc: "invalid: zero MaxOutstandingMessages", + mutateSettings: func(settings *ReceiveSettings) { + settings.MaxOutstandingMessages = 0 + }, + wantErr: true, + }, + { + desc: "invalid: zero MaxOutstandingBytes", + mutateSettings: func(settings *ReceiveSettings) { + settings.MaxOutstandingBytes = 0 + }, + wantErr: true, + }, + { + desc: "invalid: negative partition", + mutateSettings: func(settings *ReceiveSettings) { + settings.Partitions = []int{0, -1} + }, + wantErr: true, + }, + { + desc: "invalid: duplicate partition", + mutateSettings: func(settings *ReceiveSettings) { + settings.Partitions = []int{0, 1, 2, 3, 4, 1} + }, + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + settings := DefaultReceiveSettings + tc.mutateSettings(&settings) + + gotErr := validateReceiveSettings(settings) + if (gotErr != nil) != tc.wantErr { + t.Errorf("validateReceiveSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr) + } + }) + } +}