From b88a1b8c2dc7bb877a29f74903ade6f40965caf8 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 9 Dec 2020 17:01:22 -0500 Subject: [PATCH] Treat zero settings as default settings. Add unit tests --- pubsublite/ps/doc.go | 7 +- pubsublite/ps/settings.go | 85 +++++++++++++------ pubsublite/ps/settings_test.go | 151 +++++++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 28 deletions(-) create mode 100644 pubsublite/ps/settings_test.go diff --git a/pubsublite/ps/doc.go b/pubsublite/ps/doc.go index 73d552c3f41b..1a156eaff482 100644 --- a/pubsublite/ps/doc.go +++ b/pubsublite/ps/doc.go @@ -27,10 +27,9 @@ As noted in comments, the two services have some differences: every message received. - Pub/Sub Lite PublisherClients can terminate when an unretryable error occurs. - - Pub/Sub Lite has publish and subscribe throughput limits. Thus publishing - can be more sensitive to buffer overflow. - - DefaultPublishSettings and DefaultReceiveSettings should be used for default - settings rather than their empty types. + - Publishers and subscribers will be throttled if Pub/Sub Lite publish or + subscribe throughput limits are exceeded. Thus publishing can be more + sensitive to buffer overflow than Cloud Pub/Sub. For more information about Cloud Pub/Sub Lite, see https://cloud.google.com/pubsub/lite/docs. diff --git a/pubsublite/ps/settings.go b/pubsublite/ps/settings.go index 74cadad4cf5d..a5e43de44dc7 100644 --- a/pubsublite/ps/settings.go +++ b/pubsublite/ps/settings.go @@ -44,22 +44,27 @@ type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) erro // PublishSettings control the batching of published messages. These settings // apply per partition. // -// Use DefaultPublishSettings for defaults, as an empty PublishSettings will -// fail validation. +// A zero PublishSettings will result in values equivalent to +// DefaultPublishSettings. type PublishSettings struct { - // Publish a non-empty batch after this delay has passed. Must be > 0. + // Publish a non-empty batch after this delay has passed. If DelayThreshold is + // 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise + // must be > 0. DelayThreshold time.Duration - // Publish a batch when it has this many messages. Must be > 0. The maximum is - // MaxPublishRequestCount. + // Publish a batch when it has this many messages. The maximum is + // MaxPublishRequestCount. If CountThreshold is 0, it will be treated as + // DefaultPublishSettings.CountThreshold. Otherwise must be > 0. CountThreshold int - // Publish a batch when its size in bytes reaches this value. Must be > 0. The - // maximum is MaxPublishRequestBytes. + // Publish a batch when its size in bytes reaches this value. The maximum is + // MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as + // DefaultPublishSettings.ByteThreshold. Otherwise must be > 0. ByteThreshold int // The maximum time that the client will attempt to establish a publish stream - // connection to the server. Must be > 0. + // connection to the server. If Timeout is 0, it will be treated as + // DefaultPublishSettings.Timeout. Otherwise must be > 0. // // The timeout is exceeded, the publisher will terminate with the last error // that occurred while trying to reconnect. Note that if the timeout duration @@ -67,7 +72,8 @@ type PublishSettings struct { Timeout time.Duration // The maximum number of bytes that the publisher will keep in memory before - // returning ErrOverflow. Must be > 0. + // returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as + // DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0. // // Note that Pub/Sub Lite topics are provisioned a publishing throughput // capacity, per partition, shared by all publisher clients. Setting a large @@ -96,15 +102,32 @@ var DefaultPublishSettings = PublishSettings{ } func (s *PublishSettings) toWireSettings() wire.PublishSettings { - return wire.PublishSettings{ - DelayThreshold: s.DelayThreshold, - CountThreshold: s.CountThreshold, - ByteThreshold: s.ByteThreshold, - Timeout: s.Timeout, - BufferedByteLimit: s.BufferedByteLimit, + wireSettings := wire.PublishSettings{ + DelayThreshold: DefaultPublishSettings.DelayThreshold, + CountThreshold: DefaultPublishSettings.CountThreshold, + ByteThreshold: DefaultPublishSettings.ByteThreshold, + Timeout: DefaultPublishSettings.Timeout, + BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, Framework: wire.FrameworkCloudPubSubShim, } + // Negative values preserved, but will fail validation in wire package. + if s.DelayThreshold != 0 { + wireSettings.DelayThreshold = s.DelayThreshold + } + if s.CountThreshold != 0 { + wireSettings.CountThreshold = s.CountThreshold + } + if s.ByteThreshold != 0 { + wireSettings.ByteThreshold = s.ByteThreshold + } + if s.Timeout != 0 { + wireSettings.Timeout = s.Timeout + } + if s.BufferedByteLimit != 0 { + wireSettings.BufferedByteLimit = s.BufferedByteLimit + } + return wireSettings } // NackHandler is invoked when pubsub.Message.Nack() is called. Cloud Pub/Sub @@ -126,19 +149,22 @@ type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) e // partition. If MaxOutstandingBytes is being used to bound memory usage, keep // in mind the number of partitions in the associated topic. // -// Use DefaultReceiveSettings for defaults, as an empty ReceiveSettings will -// fail validation. +// A zero ReceiveSettings will result in values equivalent to +// DefaultReceiveSettings. type ReceiveSettings struct { // MaxOutstandingMessages is the maximum number of unacknowledged messages. - // Must be > 0. + // If MaxOutstandingMessages is 0, it will be treated as + // DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged - // messages. Must be > 0. + // messages. If MaxOutstandingBytes is 0, it will be treated as + // DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0. MaxOutstandingBytes int // The maximum time that the client will attempt to establish a subscribe - // stream connection to the server. Must be > 0. + // stream connection to the server. If Timeout is 0, it will be treated as + // DefaultReceiveSettings.Timeout. Otherwise must be > 0. // // The timeout is exceeded, the SubscriberClient will terminate with the last // error that occurred while trying to reconnect. @@ -167,11 +193,22 @@ var DefaultReceiveSettings = ReceiveSettings{ } func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings { - return wire.ReceiveSettings{ - MaxOutstandingMessages: s.MaxOutstandingMessages, - MaxOutstandingBytes: s.MaxOutstandingBytes, - Timeout: s.Timeout, + wireSettings := wire.ReceiveSettings{ + MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, + MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, + Timeout: DefaultReceiveSettings.Timeout, Partitions: s.Partitions, Framework: wire.FrameworkCloudPubSubShim, } + // Negative values preserved, but will fail validation in wire package. + if s.MaxOutstandingMessages != 0 { + wireSettings.MaxOutstandingMessages = s.MaxOutstandingMessages + } + if s.MaxOutstandingBytes != 0 { + wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes + } + if s.Timeout != 0 { + wireSettings.Timeout = s.Timeout + } + return wireSettings } diff --git a/pubsublite/ps/settings_test.go b/pubsublite/ps/settings_test.go new file mode 100644 index 000000000000..c71f3f15ae17 --- /dev/null +++ b/pubsublite/ps/settings_test.go @@ -0,0 +1,151 @@ +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "testing" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/internal/wire" +) + +func TestPublishSettingsToWireSettings(t *testing.T) { + for _, tc := range []struct { + desc string + settings PublishSettings + wantSettings wire.PublishSettings + }{ + { + desc: "default settings", + settings: DefaultPublishSettings, + wantSettings: wire.PublishSettings{ + DelayThreshold: DefaultPublishSettings.DelayThreshold, + CountThreshold: DefaultPublishSettings.CountThreshold, + ByteThreshold: DefaultPublishSettings.ByteThreshold, + Timeout: DefaultPublishSettings.Timeout, + BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "zero settings", + settings: PublishSettings{}, + wantSettings: DefaultPublishSettings.toWireSettings(), + }, + { + desc: "positive values", + settings: PublishSettings{ + DelayThreshold: 2, + CountThreshold: 3, + ByteThreshold: 4, + Timeout: 5, + BufferedByteLimit: 6, + }, + wantSettings: wire.PublishSettings{ + DelayThreshold: 2, + CountThreshold: 3, + ByteThreshold: 4, + Timeout: 5, + BufferedByteLimit: 6, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "negative values", + settings: PublishSettings{ + DelayThreshold: -2, + CountThreshold: -3, + ByteThreshold: -4, + Timeout: -5, + BufferedByteLimit: -6, + }, + wantSettings: wire.PublishSettings{ + DelayThreshold: -2, + CountThreshold: -3, + ByteThreshold: -4, + Timeout: -5, + BufferedByteLimit: -6, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" { + t.Errorf("PublishSettings.toWireSettings() got: -, want: +\n%s", diff) + } + }) + } +} + +func TestReceiveSettingsToWireSettings(t *testing.T) { + for _, tc := range []struct { + desc string + settings ReceiveSettings + wantSettings wire.ReceiveSettings + }{ + { + desc: "default settings", + settings: DefaultReceiveSettings, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, + MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, + Timeout: DefaultReceiveSettings.Timeout, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "zero settings", + settings: ReceiveSettings{}, + wantSettings: DefaultReceiveSettings.toWireSettings(), + }, + { + desc: "positive values", + settings: ReceiveSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 3, + Timeout: 4, + Partitions: []int{5, 6}, + }, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 3, + Timeout: 4, + Partitions: []int{5, 6}, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "negative values", + settings: ReceiveSettings{ + MaxOutstandingMessages: -2, + MaxOutstandingBytes: -3, + Timeout: -4, + Partitions: []int{-5, -6}, + }, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: -2, + MaxOutstandingBytes: -3, + Timeout: -4, + Partitions: []int{-5, -6}, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" { + t.Errorf("ReceiveSettings.toWireSettings() got: -, want: +\n%s", diff) + } + }) + } +}