Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): Publish settings and errors #3075

Merged
merged 9 commits into from Oct 28, 2020
22 changes: 22 additions & 0 deletions pubsublite/errors.go
@@ -0,0 +1,22 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import "errors"

var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")
)
103 changes: 103 additions & 0 deletions pubsublite/settings.go
@@ -0,0 +1,103 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"errors"
"fmt"
"time"
)

const (
// MaxPublishRequestCount is the maximum number of messages that can be
// batched in a single publish request.
MaxPublishRequestCount = 1000

// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes.
MaxPublishRequestBytes = 3500000
)

// PublishSettings control the batching of published messages.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
DelayThreshold time.Duration

// Publish a batch when it has this many messages. Must be > 0. The maximum is
// MaxPublishRequestCount.
CountThreshold int

// Publish a batch when its size in bytes reaches this value. Must be > 0. The
// maximum is MaxPublishRequestBytes.
ByteThreshold int

// The maximum time that the client will attempt to establish a publish stream
// connection to the server. Must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
// that occurred while trying to reconnect. Note that if the timeout duration
// is long, ErrOverflow may occur first.
Timeout time.Duration

// The maximum number of bytes that the publisher will keep in memory before
// returning ErrOverflow. Must be > 0.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
// buffer size can mitigate transient publish spikes. However, consistently
// attempting to publish messages at a much higher rate than the publishing
// throughput capacity can cause the buffers to overflow. For more
// information, see https://cloud.google.com/pubsub/lite/docs/topics.
BufferedByteLimit int
}

// DefaultPublishSettings holds the default values for PublishSettings.
var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
}

func validatePublishSettings(settings PublishSettings) error {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
if settings.DelayThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0")
}
if settings.Timeout <= 0 {
return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0")
}
if settings.CountThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0")
}
if settings.CountThreshold > MaxPublishRequestCount {
return fmt.Errorf("pubsublite: invalid publish settings. Maximum CountThreshold is MaxPublishRequestCount (%d)", MaxPublishRequestCount)
}
if settings.ByteThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. ByteThreshold must be > 0")
}
if settings.ByteThreshold > MaxPublishRequestBytes {
return fmt.Errorf("pubsublite: invalid publish settings. Maximum ByteThreshold is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)
}
if settings.BufferedByteLimit <= 0 {
return errors.New("pubsublite: invalid publish settings. BufferedByteLimit must be > 0")
}
return nil
}
114 changes: 114 additions & 0 deletions pubsublite/settings_test.go
@@ -0,0 +1,114 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"testing"
"time"
)

func TestValidatePublishSettings(t *testing.T) {
for _, tc := range []struct {
desc string
// settingsFunc is passed DefaultPublishSettings
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
settingsFunc func(settings PublishSettings) PublishSettings
wantErr bool
}{
{
desc: "valid: default",
settingsFunc: func(settings PublishSettings) PublishSettings {
return DefaultPublishSettings
},
wantErr: false,
},
{
desc: "valid: max",
settingsFunc: func(settings PublishSettings) PublishSettings {
return PublishSettings{
CountThreshold: MaxPublishRequestCount,
ByteThreshold: MaxPublishRequestBytes,
// These have no max bounds, check large values.
DelayThreshold: 24 * time.Hour,
Timeout: 24 * time.Hour,
BufferedByteLimit: 1e10,
}
},
wantErr: false,
},
{
desc: "invalid: zero CountThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.CountThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: CountThreshold over MaxPublishRequestCount",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.CountThreshold = MaxPublishRequestCount + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.ByteThreshold = MaxPublishRequestBytes + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: zero ByteThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.ByteThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: zero DelayThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.DelayThreshold = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero Timeout",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.Timeout = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero BufferedByteLimit",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.BufferedByteLimit = 0
return settings
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := tc.settingsFunc(DefaultPublishSettings)
gotErr := validatePublishSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
}
})
}
}