Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): Receive settings #3195

Merged
merged 3 commits into from Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 63 additions & 3 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -29,11 +29,13 @@ const (
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes.
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
MaxPublishRequestBytes = 3500000
)

// PublishSettings control the batching of published messages.
// PublishSettings control the batching of published messages. These settings
// apply per partition.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
DelayThreshold time.Duration
Expand Down Expand Up @@ -71,7 +73,7 @@ var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
Timeout: 10 * time.Minute,
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
Expand Down Expand Up @@ -101,3 +103,61 @@ func validatePublishSettings(settings PublishSettings) error {
}
return nil
}

// ReceiveSettings control the receiving of messages. These settings apply
// per partition.
type ReceiveSettings struct {
// MaxOutstandingMessages is the maximum number of unacknowledged messages.
// Must be > 0.
MaxOutstandingMessages int

// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. Must be > 0.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. Must be > 0.
//
// The timeout is exceeded, the subscriber will terminate with the last error
// that occurred while trying to reconnect.
Timeout time.Duration

// The topic partition numbers (zero-indexed) to receive messages from.
// Values must be less than the number of partitions for the topic. If not
// specified, the client will use the partition assignment service to
// determine which partitions it should connect to.
Partitions []int
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
Timeout: 10 * time.Minute,
}

func validateReceiveSettings(settings ReceiveSettings) error {
if settings.MaxOutstandingMessages <= 0 {
return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0")
}
if settings.MaxOutstandingBytes <= 0 {
return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0")
}
if settings.Timeout <= 0 {
return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0")
}
if len(settings.Partitions) > 0 {
var void struct{}
partitionMap := make(map[int]struct{})
for _, p := range settings.Partitions {
if p < 0 {
return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p)
}
if _, exists := partitionMap[p]; exists {
return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p)
}
partitionMap[p] = void
}
}
return nil
}
119 changes: 87 additions & 32 deletions pubsublite/internal/wire/settings_test.go
Expand Up @@ -21,94 +21,149 @@ import (
func TestValidatePublishSettings(t *testing.T) {
for _, tc := range []struct {
desc string
// settingsFunc is passed DefaultPublishSettings
settingsFunc func(settings PublishSettings) PublishSettings
wantErr bool
// mutateSettings is passed a copy of DefaultPublishSettings to mutate.
mutateSettings func(settings *PublishSettings)
wantErr bool
}{
{
desc: "valid: default",
settingsFunc: func(settings PublishSettings) PublishSettings {
return DefaultPublishSettings
},
wantErr: false,
desc: "valid: default",
mutateSettings: func(settings *PublishSettings) {},
wantErr: false,
},
{
desc: "valid: max",
settingsFunc: func(settings PublishSettings) PublishSettings {
return PublishSettings{
CountThreshold: MaxPublishRequestCount,
ByteThreshold: MaxPublishRequestBytes,
// These have no max bounds, check large values.
DelayThreshold: 24 * time.Hour,
Timeout: 24 * time.Hour,
BufferedByteLimit: 1e10,
}
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = MaxPublishRequestCount
settings.ByteThreshold = MaxPublishRequestBytes
// These have no max bounds, check large values.
settings.DelayThreshold = 24 * time.Hour
settings.Timeout = 24 * time.Hour
settings.BufferedByteLimit = 1e10
},
wantErr: false,
},
{
desc: "invalid: zero CountThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: CountThreshold over MaxPublishRequestCount",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = MaxPublishRequestCount + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.ByteThreshold = MaxPublishRequestBytes + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: zero ByteThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.ByteThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: zero DelayThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.DelayThreshold = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero Timeout",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.Timeout = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero BufferedByteLimit",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.BufferedByteLimit = 0
return settings
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := tc.settingsFunc(DefaultPublishSettings)
settings := DefaultPublishSettings
tc.mutateSettings(&settings)

gotErr := validatePublishSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
}
})
}
}

func TestValidateReceiveSettings(t *testing.T) {
for _, tc := range []struct {
desc string
// mutateSettings is passed a copy of DefaultReceiveSettings to mutate.
mutateSettings func(settings *ReceiveSettings)
wantErr bool
}{
{
desc: "valid: default",
mutateSettings: func(settings *ReceiveSettings) {},
wantErr: false,
},
{
desc: "valid: max",
mutateSettings: func(settings *ReceiveSettings) {
settings.Partitions = []int{5, 3, 9, 1, 4, 0}
// These have no max bounds, check large values.
settings.MaxOutstandingMessages = 100000
settings.MaxOutstandingBytes = 1e10
settings.Timeout = 24 * time.Hour
},
wantErr: false,
},
{
desc: "invalid: zero MaxOutstandingMessages",
mutateSettings: func(settings *ReceiveSettings) {
settings.MaxOutstandingMessages = 0
},
wantErr: true,
},
{
desc: "invalid: zero MaxOutstandingBytes",
mutateSettings: func(settings *ReceiveSettings) {
settings.MaxOutstandingBytes = 0
},
wantErr: true,
},
{
desc: "invalid: negative partition",
mutateSettings: func(settings *ReceiveSettings) {
settings.Partitions = []int{0, -1}
},
wantErr: true,
},
{
desc: "invalid: duplicate partition",
mutateSettings: func(settings *ReceiveSettings) {
settings.Partitions = []int{0, 1, 2, 3, 4, 1}
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := DefaultReceiveSettings
tc.mutateSettings(&settings)

gotErr := validateReceiveSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validateReceiveSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
}
})
}
}