Skip to content

Commit 9eb9fcb

Browse files
authored
feat(pubsublite): Publish settings and errors (#3075)
PublishSettings are almost identical to [pubsub.PublishSettings](https://godoc.org/cloud.google.com/go/pubsub#PublishSettings). Some max thresholds were changed to be consistent with the [Pub/Sub Lite Java client library](https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java).
1 parent 97cfd45 commit 9eb9fcb

File tree

3 files changed

+239
-0
lines changed

3 files changed

+239
-0
lines changed

pubsublite/errors.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import "errors"
17+
18+
var (
19+
// ErrOverflow indicates that the publish buffers have overflowed. See
20+
// comments for PublishSettings.BufferedByteLimit.
21+
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")
22+
)

pubsublite/settings.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import (
17+
"errors"
18+
"fmt"
19+
"time"
20+
)
21+
22+
const (
23+
// MaxPublishRequestCount is the maximum number of messages that can be
24+
// batched in a single publish request.
25+
MaxPublishRequestCount = 1000
26+
27+
// MaxPublishMessageBytes is the maximum allowed serialized size of a single
28+
// Pub/Sub message in bytes.
29+
MaxPublishMessageBytes = 1000000
30+
31+
// MaxPublishRequestBytes is the maximum allowed serialized size of a single
32+
// publish request (containing a batch of messages) in bytes.
33+
MaxPublishRequestBytes = 3500000
34+
)
35+
36+
// PublishSettings control the batching of published messages.
37+
type PublishSettings struct {
38+
// Publish a non-empty batch after this delay has passed. Must be > 0.
39+
DelayThreshold time.Duration
40+
41+
// Publish a batch when it has this many messages. Must be > 0. The maximum is
42+
// MaxPublishRequestCount.
43+
CountThreshold int
44+
45+
// Publish a batch when its size in bytes reaches this value. Must be > 0. The
46+
// maximum is MaxPublishRequestBytes.
47+
ByteThreshold int
48+
49+
// The maximum time that the client will attempt to establish a publish stream
50+
// connection to the server. Must be > 0.
51+
//
52+
// The timeout is exceeded, the publisher will terminate with the last error
53+
// that occurred while trying to reconnect. Note that if the timeout duration
54+
// is long, ErrOverflow may occur first.
55+
Timeout time.Duration
56+
57+
// The maximum number of bytes that the publisher will keep in memory before
58+
// returning ErrOverflow. Must be > 0.
59+
//
60+
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
61+
// capacity, per partition, shared by all publisher clients. Setting a large
62+
// buffer size can mitigate transient publish spikes. However, consistently
63+
// attempting to publish messages at a much higher rate than the publishing
64+
// throughput capacity can cause the buffers to overflow. For more
65+
// information, see https://cloud.google.com/pubsub/lite/docs/topics.
66+
BufferedByteLimit int
67+
}
68+
69+
// DefaultPublishSettings holds the default values for PublishSettings.
70+
var DefaultPublishSettings = PublishSettings{
71+
DelayThreshold: 10 * time.Millisecond,
72+
CountThreshold: 100,
73+
ByteThreshold: 1e6,
74+
Timeout: 60 * time.Second,
75+
// By default set to a high limit that is not likely to occur, but prevents
76+
// OOM errors in clients.
77+
BufferedByteLimit: 1 << 30, // 1 GiB
78+
}
79+
80+
func validatePublishSettings(settings PublishSettings) error {
81+
if settings.DelayThreshold <= 0 {
82+
return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0")
83+
}
84+
if settings.Timeout <= 0 {
85+
return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0")
86+
}
87+
if settings.CountThreshold <= 0 {
88+
return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0")
89+
}
90+
if settings.CountThreshold > MaxPublishRequestCount {
91+
return fmt.Errorf("pubsublite: invalid publish settings. Maximum CountThreshold is MaxPublishRequestCount (%d)", MaxPublishRequestCount)
92+
}
93+
if settings.ByteThreshold <= 0 {
94+
return errors.New("pubsublite: invalid publish settings. ByteThreshold must be > 0")
95+
}
96+
if settings.ByteThreshold > MaxPublishRequestBytes {
97+
return fmt.Errorf("pubsublite: invalid publish settings. Maximum ByteThreshold is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)
98+
}
99+
if settings.BufferedByteLimit <= 0 {
100+
return errors.New("pubsublite: invalid publish settings. BufferedByteLimit must be > 0")
101+
}
102+
return nil
103+
}

pubsublite/settings_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import (
17+
"testing"
18+
"time"
19+
)
20+
21+
func TestValidatePublishSettings(t *testing.T) {
22+
for _, tc := range []struct {
23+
desc string
24+
// settingsFunc is passed DefaultPublishSettings
25+
settingsFunc func(settings PublishSettings) PublishSettings
26+
wantErr bool
27+
}{
28+
{
29+
desc: "valid: default",
30+
settingsFunc: func(settings PublishSettings) PublishSettings {
31+
return DefaultPublishSettings
32+
},
33+
wantErr: false,
34+
},
35+
{
36+
desc: "valid: max",
37+
settingsFunc: func(settings PublishSettings) PublishSettings {
38+
return PublishSettings{
39+
CountThreshold: MaxPublishRequestCount,
40+
ByteThreshold: MaxPublishRequestBytes,
41+
// These have no max bounds, check large values.
42+
DelayThreshold: 24 * time.Hour,
43+
Timeout: 24 * time.Hour,
44+
BufferedByteLimit: 1e10,
45+
}
46+
},
47+
wantErr: false,
48+
},
49+
{
50+
desc: "invalid: zero CountThreshold",
51+
settingsFunc: func(settings PublishSettings) PublishSettings {
52+
settings.CountThreshold = 0
53+
return settings
54+
},
55+
wantErr: true,
56+
},
57+
{
58+
desc: "invalid: CountThreshold over MaxPublishRequestCount",
59+
settingsFunc: func(settings PublishSettings) PublishSettings {
60+
settings.CountThreshold = MaxPublishRequestCount + 1
61+
return settings
62+
},
63+
wantErr: true,
64+
},
65+
{
66+
desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
67+
settingsFunc: func(settings PublishSettings) PublishSettings {
68+
settings.ByteThreshold = MaxPublishRequestBytes + 1
69+
return settings
70+
},
71+
wantErr: true,
72+
},
73+
{
74+
desc: "invalid: zero ByteThreshold",
75+
settingsFunc: func(settings PublishSettings) PublishSettings {
76+
settings.ByteThreshold = 0
77+
return settings
78+
},
79+
wantErr: true,
80+
},
81+
{
82+
desc: "invalid: zero DelayThreshold",
83+
settingsFunc: func(settings PublishSettings) PublishSettings {
84+
settings.DelayThreshold = time.Duration(0)
85+
return settings
86+
},
87+
wantErr: true,
88+
},
89+
{
90+
desc: "invalid: zero Timeout",
91+
settingsFunc: func(settings PublishSettings) PublishSettings {
92+
settings.Timeout = time.Duration(0)
93+
return settings
94+
},
95+
wantErr: true,
96+
},
97+
{
98+
desc: "invalid: zero BufferedByteLimit",
99+
settingsFunc: func(settings PublishSettings) PublishSettings {
100+
settings.BufferedByteLimit = 0
101+
return settings
102+
},
103+
wantErr: true,
104+
},
105+
} {
106+
t.Run(tc.desc, func(t *testing.T) {
107+
settings := tc.settingsFunc(DefaultPublishSettings)
108+
gotErr := validatePublishSettings(settings)
109+
if (gotErr != nil) != tc.wantErr {
110+
t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
111+
}
112+
})
113+
}
114+
}

0 commit comments

Comments
 (0)