diff --git a/pubsublite/errors.go b/pubsublite/errors.go new file mode 100644 index 00000000000..1101188e5a9 --- /dev/null +++ b/pubsublite/errors.go @@ -0,0 +1,22 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import "errors" + +var ( + // ErrOverflow indicates that the publish buffers have overflowed. See + // comments for PublishSettings.BufferedByteLimit. + ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed") +) diff --git a/pubsublite/settings.go b/pubsublite/settings.go new file mode 100644 index 00000000000..b1db10fac5f --- /dev/null +++ b/pubsublite/settings.go @@ -0,0 +1,103 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "errors" + "fmt" + "time" +) + +const ( + // MaxPublishRequestCount is the maximum number of messages that can be + // batched in a single publish request. + MaxPublishRequestCount = 1000 + + // MaxPublishMessageBytes is the maximum allowed serialized size of a single + // Pub/Sub message in bytes. + MaxPublishMessageBytes = 1000000 + + // MaxPublishRequestBytes is the maximum allowed serialized size of a single + // publish request (containing a batch of messages) in bytes. + MaxPublishRequestBytes = 3500000 +) + +// PublishSettings control the batching of published messages. +type PublishSettings struct { + // Publish a non-empty batch after this delay has passed. Must be > 0. + DelayThreshold time.Duration + + // Publish a batch when it has this many messages. Must be > 0. The maximum is + // MaxPublishRequestCount. + CountThreshold int + + // Publish a batch when its size in bytes reaches this value. Must be > 0. The + // maximum is MaxPublishRequestBytes. + ByteThreshold int + + // The maximum time that the client will attempt to establish a publish stream + // connection to the server. Must be > 0. + // + // The timeout is exceeded, the publisher will terminate with the last error + // that occurred while trying to reconnect. Note that if the timeout duration + // is long, ErrOverflow may occur first. + Timeout time.Duration + + // The maximum number of bytes that the publisher will keep in memory before + // returning ErrOverflow. Must be > 0. + // + // Note that Pub/Sub Lite topics are provisioned a publishing throughput + // capacity, per partition, shared by all publisher clients. Setting a large + // buffer size can mitigate transient publish spikes. However, consistently + // attempting to publish messages at a much higher rate than the publishing + // throughput capacity can cause the buffers to overflow. For more + // information, see https://cloud.google.com/pubsub/lite/docs/topics. + BufferedByteLimit int +} + +// DefaultPublishSettings holds the default values for PublishSettings. +var DefaultPublishSettings = PublishSettings{ + DelayThreshold: 10 * time.Millisecond, + CountThreshold: 100, + ByteThreshold: 1e6, + Timeout: 60 * time.Second, + // By default set to a high limit that is not likely to occur, but prevents + // OOM errors in clients. + BufferedByteLimit: 1 << 30, // 1 GiB +} + +func validatePublishSettings(settings PublishSettings) error { + if settings.DelayThreshold <= 0 { + return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0") + } + if settings.Timeout <= 0 { + return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0") + } + if settings.CountThreshold <= 0 { + return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0") + } + if settings.CountThreshold > MaxPublishRequestCount { + return fmt.Errorf("pubsublite: invalid publish settings. Maximum CountThreshold is MaxPublishRequestCount (%d)", MaxPublishRequestCount) + } + if settings.ByteThreshold <= 0 { + return errors.New("pubsublite: invalid publish settings. ByteThreshold must be > 0") + } + if settings.ByteThreshold > MaxPublishRequestBytes { + return fmt.Errorf("pubsublite: invalid publish settings. Maximum ByteThreshold is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes) + } + if settings.BufferedByteLimit <= 0 { + return errors.New("pubsublite: invalid publish settings. BufferedByteLimit must be > 0") + } + return nil +} diff --git a/pubsublite/settings_test.go b/pubsublite/settings_test.go new file mode 100644 index 00000000000..6c49880ed60 --- /dev/null +++ b/pubsublite/settings_test.go @@ -0,0 +1,114 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "testing" + "time" +) + +func TestValidatePublishSettings(t *testing.T) { + for _, tc := range []struct { + desc string + // settingsFunc is passed DefaultPublishSettings + settingsFunc func(settings PublishSettings) PublishSettings + wantErr bool + }{ + { + desc: "valid: default", + settingsFunc: func(settings PublishSettings) PublishSettings { + return DefaultPublishSettings + }, + wantErr: false, + }, + { + desc: "valid: max", + settingsFunc: func(settings PublishSettings) PublishSettings { + return PublishSettings{ + CountThreshold: MaxPublishRequestCount, + ByteThreshold: MaxPublishRequestBytes, + // These have no max bounds, check large values. + DelayThreshold: 24 * time.Hour, + Timeout: 24 * time.Hour, + BufferedByteLimit: 1e10, + } + }, + wantErr: false, + }, + { + desc: "invalid: zero CountThreshold", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.CountThreshold = 0 + return settings + }, + wantErr: true, + }, + { + desc: "invalid: CountThreshold over MaxPublishRequestCount", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.CountThreshold = MaxPublishRequestCount + 1 + return settings + }, + wantErr: true, + }, + { + desc: "invalid: ByteThreshold over MaxPublishRequestBytes", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.ByteThreshold = MaxPublishRequestBytes + 1 + return settings + }, + wantErr: true, + }, + { + desc: "invalid: zero ByteThreshold", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.ByteThreshold = 0 + return settings + }, + wantErr: true, + }, + { + desc: "invalid: zero DelayThreshold", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.DelayThreshold = time.Duration(0) + return settings + }, + wantErr: true, + }, + { + desc: "invalid: zero Timeout", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.Timeout = time.Duration(0) + return settings + }, + wantErr: true, + }, + { + desc: "invalid: zero BufferedByteLimit", + settingsFunc: func(settings PublishSettings) PublishSettings { + settings.BufferedByteLimit = 0 + return settings + }, + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + settings := tc.settingsFunc(DefaultPublishSettings) + gotErr := validatePublishSettings(settings) + if (gotErr != nil) != tc.wantErr { + t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr) + } + }) + } +}