Skip to content

Commit

Permalink
Treat zero settings as default settings. Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Dec 9, 2020
1 parent 36a556c commit b88a1b8
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 28 deletions.
7 changes: 3 additions & 4 deletions pubsublite/ps/doc.go
Expand Up @@ -27,10 +27,9 @@ As noted in comments, the two services have some differences:
every message received.
- Pub/Sub Lite PublisherClients can terminate when an unretryable error
occurs.
- Pub/Sub Lite has publish and subscribe throughput limits. Thus publishing
can be more sensitive to buffer overflow.
- DefaultPublishSettings and DefaultReceiveSettings should be used for default
settings rather than their empty types.
- Publishers and subscribers will be throttled if Pub/Sub Lite publish or
subscribe throughput limits are exceeded. Thus publishing can be more
sensitive to buffer overflow than Cloud Pub/Sub.
For more information about Cloud Pub/Sub Lite, see
https://cloud.google.com/pubsub/lite/docs.
Expand Down
85 changes: 61 additions & 24 deletions pubsublite/ps/settings.go
Expand Up @@ -44,30 +44,36 @@ type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) erro
// PublishSettings control the batching of published messages. These settings
// apply per partition.
//
// Use DefaultPublishSettings for defaults, as an empty PublishSettings will
// fail validation.
// A zero PublishSettings will result in values equivalent to
// DefaultPublishSettings.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
// Publish a non-empty batch after this delay has passed. If DelayThreshold is
// 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise
// must be > 0.
DelayThreshold time.Duration

// Publish a batch when it has this many messages. Must be > 0. The maximum is
// MaxPublishRequestCount.
// Publish a batch when it has this many messages. The maximum is
// MaxPublishRequestCount. If CountThreshold is 0, it will be treated as
// DefaultPublishSettings.CountThreshold. Otherwise must be > 0.
CountThreshold int

// Publish a batch when its size in bytes reaches this value. Must be > 0. The
// maximum is MaxPublishRequestBytes.
// Publish a batch when its size in bytes reaches this value. The maximum is
// MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as
// DefaultPublishSettings.ByteThreshold. Otherwise must be > 0.
ByteThreshold int

// The maximum time that the client will attempt to establish a publish stream
// connection to the server. Must be > 0.
// connection to the server. If Timeout is 0, it will be treated as
// DefaultPublishSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
// that occurred while trying to reconnect. Note that if the timeout duration
// is long, ErrOverflow may occur first.
Timeout time.Duration

// The maximum number of bytes that the publisher will keep in memory before
// returning ErrOverflow. Must be > 0.
// returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as
// DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
Expand Down Expand Up @@ -96,15 +102,32 @@ var DefaultPublishSettings = PublishSettings{
}

func (s *PublishSettings) toWireSettings() wire.PublishSettings {
return wire.PublishSettings{
DelayThreshold: s.DelayThreshold,
CountThreshold: s.CountThreshold,
ByteThreshold: s.ByteThreshold,
Timeout: s.Timeout,
BufferedByteLimit: s.BufferedByteLimit,
wireSettings := wire.PublishSettings{
DelayThreshold: DefaultPublishSettings.DelayThreshold,
CountThreshold: DefaultPublishSettings.CountThreshold,
ByteThreshold: DefaultPublishSettings.ByteThreshold,
Timeout: DefaultPublishSettings.Timeout,
BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
Framework: wire.FrameworkCloudPubSubShim,
}
// Negative values preserved, but will fail validation in wire package.
if s.DelayThreshold != 0 {
wireSettings.DelayThreshold = s.DelayThreshold
}
if s.CountThreshold != 0 {
wireSettings.CountThreshold = s.CountThreshold
}
if s.ByteThreshold != 0 {
wireSettings.ByteThreshold = s.ByteThreshold
}
if s.Timeout != 0 {
wireSettings.Timeout = s.Timeout
}
if s.BufferedByteLimit != 0 {
wireSettings.BufferedByteLimit = s.BufferedByteLimit
}
return wireSettings
}

// NackHandler is invoked when pubsub.Message.Nack() is called. Cloud Pub/Sub
Expand All @@ -126,19 +149,22 @@ type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) e
// partition. If MaxOutstandingBytes is being used to bound memory usage, keep
// in mind the number of partitions in the associated topic.
//
// Use DefaultReceiveSettings for defaults, as an empty ReceiveSettings will
// fail validation.
// A zero ReceiveSettings will result in values equivalent to
// DefaultReceiveSettings.
type ReceiveSettings struct {
// MaxOutstandingMessages is the maximum number of unacknowledged messages.
// Must be > 0.
// If MaxOutstandingMessages is 0, it will be treated as
// DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0.
MaxOutstandingMessages int

// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. Must be > 0.
// messages. If MaxOutstandingBytes is 0, it will be treated as
// DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. Must be > 0.
// stream connection to the server. If Timeout is 0, it will be treated as
// DefaultReceiveSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the SubscriberClient will terminate with the last
// error that occurred while trying to reconnect.
Expand Down Expand Up @@ -167,11 +193,22 @@ var DefaultReceiveSettings = ReceiveSettings{
}

func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings {
return wire.ReceiveSettings{
MaxOutstandingMessages: s.MaxOutstandingMessages,
MaxOutstandingBytes: s.MaxOutstandingBytes,
Timeout: s.Timeout,
wireSettings := wire.ReceiveSettings{
MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages,
MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes,
Timeout: DefaultReceiveSettings.Timeout,
Partitions: s.Partitions,
Framework: wire.FrameworkCloudPubSubShim,
}
// Negative values preserved, but will fail validation in wire package.
if s.MaxOutstandingMessages != 0 {
wireSettings.MaxOutstandingMessages = s.MaxOutstandingMessages
}
if s.MaxOutstandingBytes != 0 {
wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes
}
if s.Timeout != 0 {
wireSettings.Timeout = s.Timeout
}
return wireSettings
}
151 changes: 151 additions & 0 deletions pubsublite/ps/settings_test.go
@@ -0,0 +1,151 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps

import (
"testing"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/wire"
)

func TestPublishSettingsToWireSettings(t *testing.T) {
for _, tc := range []struct {
desc string
settings PublishSettings
wantSettings wire.PublishSettings
}{
{
desc: "default settings",
settings: DefaultPublishSettings,
wantSettings: wire.PublishSettings{
DelayThreshold: DefaultPublishSettings.DelayThreshold,
CountThreshold: DefaultPublishSettings.CountThreshold,
ByteThreshold: DefaultPublishSettings.ByteThreshold,
Timeout: DefaultPublishSettings.Timeout,
BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
Framework: wire.FrameworkCloudPubSubShim,
},
},
{
desc: "zero settings",
settings: PublishSettings{},
wantSettings: DefaultPublishSettings.toWireSettings(),
},
{
desc: "positive values",
settings: PublishSettings{
DelayThreshold: 2,
CountThreshold: 3,
ByteThreshold: 4,
Timeout: 5,
BufferedByteLimit: 6,
},
wantSettings: wire.PublishSettings{
DelayThreshold: 2,
CountThreshold: 3,
ByteThreshold: 4,
Timeout: 5,
BufferedByteLimit: 6,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
Framework: wire.FrameworkCloudPubSubShim,
},
},
{
desc: "negative values",
settings: PublishSettings{
DelayThreshold: -2,
CountThreshold: -3,
ByteThreshold: -4,
Timeout: -5,
BufferedByteLimit: -6,
},
wantSettings: wire.PublishSettings{
DelayThreshold: -2,
CountThreshold: -3,
ByteThreshold: -4,
Timeout: -5,
BufferedByteLimit: -6,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
Framework: wire.FrameworkCloudPubSubShim,
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" {
t.Errorf("PublishSettings.toWireSettings() got: -, want: +\n%s", diff)
}
})
}
}

func TestReceiveSettingsToWireSettings(t *testing.T) {
for _, tc := range []struct {
desc string
settings ReceiveSettings
wantSettings wire.ReceiveSettings
}{
{
desc: "default settings",
settings: DefaultReceiveSettings,
wantSettings: wire.ReceiveSettings{
MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages,
MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes,
Timeout: DefaultReceiveSettings.Timeout,
Framework: wire.FrameworkCloudPubSubShim,
},
},
{
desc: "zero settings",
settings: ReceiveSettings{},
wantSettings: DefaultReceiveSettings.toWireSettings(),
},
{
desc: "positive values",
settings: ReceiveSettings{
MaxOutstandingMessages: 2,
MaxOutstandingBytes: 3,
Timeout: 4,
Partitions: []int{5, 6},
},
wantSettings: wire.ReceiveSettings{
MaxOutstandingMessages: 2,
MaxOutstandingBytes: 3,
Timeout: 4,
Partitions: []int{5, 6},
Framework: wire.FrameworkCloudPubSubShim,
},
},
{
desc: "negative values",
settings: ReceiveSettings{
MaxOutstandingMessages: -2,
MaxOutstandingBytes: -3,
Timeout: -4,
Partitions: []int{-5, -6},
},
wantSettings: wire.ReceiveSettings{
MaxOutstandingMessages: -2,
MaxOutstandingBytes: -3,
Timeout: -4,
Partitions: []int{-5, -6},
Framework: wire.FrameworkCloudPubSubShim,
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" {
t.Errorf("ReceiveSettings.toWireSettings() got: -, want: +\n%s", diff)
}
})
}
}

0 comments on commit b88a1b8

Please sign in to comment.