Skip to content

Commit

Permalink
feat(pubsublite): Receive settings (#3195)
Browse files Browse the repository at this point in the history
These will be used by subscribers.
  • Loading branch information
tmdiep committed Nov 13, 2020
1 parent 99f5ac7 commit bd837fc
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 35 deletions.
66 changes: 63 additions & 3 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -29,11 +29,13 @@ const (
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes.
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
MaxPublishRequestBytes = 3500000
)

// PublishSettings control the batching of published messages.
// PublishSettings control the batching of published messages. These settings
// apply per partition.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
DelayThreshold time.Duration
Expand Down Expand Up @@ -71,7 +73,7 @@ var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
Timeout: 10 * time.Minute,
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
Expand Down Expand Up @@ -101,3 +103,61 @@ func validatePublishSettings(settings PublishSettings) error {
}
return nil
}

// ReceiveSettings control the receiving of messages. These settings apply
// per partition.
type ReceiveSettings struct {
// MaxOutstandingMessages is the maximum number of unacknowledged messages.
// Must be > 0.
MaxOutstandingMessages int

// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. Must be > 0.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. Must be > 0.
//
// The timeout is exceeded, the subscriber will terminate with the last error
// that occurred while trying to reconnect.
Timeout time.Duration

// The topic partition numbers (zero-indexed) to receive messages from.
// Values must be less than the number of partitions for the topic. If not
// specified, the client will use the partition assignment service to
// determine which partitions it should connect to.
Partitions []int
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
Timeout: 10 * time.Minute,
}

func validateReceiveSettings(settings ReceiveSettings) error {
if settings.MaxOutstandingMessages <= 0 {
return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0")
}
if settings.MaxOutstandingBytes <= 0 {
return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0")
}
if settings.Timeout <= 0 {
return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0")
}
if len(settings.Partitions) > 0 {
var void struct{}
partitionMap := make(map[int]struct{})
for _, p := range settings.Partitions {
if p < 0 {
return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p)
}
if _, exists := partitionMap[p]; exists {
return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p)
}
partitionMap[p] = void
}
}
return nil
}
119 changes: 87 additions & 32 deletions pubsublite/internal/wire/settings_test.go
Expand Up @@ -21,94 +21,149 @@ import (
func TestValidatePublishSettings(t *testing.T) {
for _, tc := range []struct {
desc string
// settingsFunc is passed DefaultPublishSettings
settingsFunc func(settings PublishSettings) PublishSettings
wantErr bool
// mutateSettings is passed a copy of DefaultPublishSettings to mutate.
mutateSettings func(settings *PublishSettings)
wantErr bool
}{
{
desc: "valid: default",
settingsFunc: func(settings PublishSettings) PublishSettings {
return DefaultPublishSettings
},
wantErr: false,
desc: "valid: default",
mutateSettings: func(settings *PublishSettings) {},
wantErr: false,
},
{
desc: "valid: max",
settingsFunc: func(settings PublishSettings) PublishSettings {
return PublishSettings{
CountThreshold: MaxPublishRequestCount,
ByteThreshold: MaxPublishRequestBytes,
// These have no max bounds, check large values.
DelayThreshold: 24 * time.Hour,
Timeout: 24 * time.Hour,
BufferedByteLimit: 1e10,
}
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = MaxPublishRequestCount
settings.ByteThreshold = MaxPublishRequestBytes
// These have no max bounds, check large values.
settings.DelayThreshold = 24 * time.Hour
settings.Timeout = 24 * time.Hour
settings.BufferedByteLimit = 1e10
},
wantErr: false,
},
{
desc: "invalid: zero CountThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: CountThreshold over MaxPublishRequestCount",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = MaxPublishRequestCount + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.ByteThreshold = MaxPublishRequestBytes + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: zero ByteThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.ByteThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: zero DelayThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.DelayThreshold = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero Timeout",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.Timeout = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero BufferedByteLimit",
settingsFunc: func(settings PublishSettings) PublishSettings {
mutateSettings: func(settings *PublishSettings) {
settings.BufferedByteLimit = 0
return settings
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := tc.settingsFunc(DefaultPublishSettings)
settings := DefaultPublishSettings
tc.mutateSettings(&settings)

gotErr := validatePublishSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
}
})
}
}

func TestValidateReceiveSettings(t *testing.T) {
for _, tc := range []struct {
desc string
// mutateSettings is passed a copy of DefaultReceiveSettings to mutate.
mutateSettings func(settings *ReceiveSettings)
wantErr bool
}{
{
desc: "valid: default",
mutateSettings: func(settings *ReceiveSettings) {},
wantErr: false,
},
{
desc: "valid: max",
mutateSettings: func(settings *ReceiveSettings) {
settings.Partitions = []int{5, 3, 9, 1, 4, 0}
// These have no max bounds, check large values.
settings.MaxOutstandingMessages = 100000
settings.MaxOutstandingBytes = 1e10
settings.Timeout = 24 * time.Hour
},
wantErr: false,
},
{
desc: "invalid: zero MaxOutstandingMessages",
mutateSettings: func(settings *ReceiveSettings) {
settings.MaxOutstandingMessages = 0
},
wantErr: true,
},
{
desc: "invalid: zero MaxOutstandingBytes",
mutateSettings: func(settings *ReceiveSettings) {
settings.MaxOutstandingBytes = 0
},
wantErr: true,
},
{
desc: "invalid: negative partition",
mutateSettings: func(settings *ReceiveSettings) {
settings.Partitions = []int{0, -1}
},
wantErr: true,
},
{
desc: "invalid: duplicate partition",
mutateSettings: func(settings *ReceiveSettings) {
settings.Partitions = []int{0, 1, 2, 3, 4, 1}
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := DefaultReceiveSettings
tc.mutateSettings(&settings)

gotErr := validateReceiveSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validateReceiveSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
}
})
}
}

0 comments on commit bd837fc

Please sign in to comment.