-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
settings.go
185 lines (156 loc) · 7.35 KB
/
settings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package ps
import (
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
const (
// MaxPublishRequestCount is the maximum number of messages that can be
// batched in a single publish request.
MaxPublishRequestCount = wire.MaxPublishRequestCount
// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = wire.MaxPublishMessageBytes
// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes.
MaxPublishRequestBytes = wire.MaxPublishRequestBytes
)
// KeyExtractorFunc is a function that extracts an ordering key from a Message.
type KeyExtractorFunc func(*pubsub.Message) []byte
// PublishMessageTransformerFunc transforms a pubsub.Message to a PubSubMessage
// API proto. If this returns an error, the pubsub.PublishResult will be
// errored and the PublisherClient will consider this a fatal error and
// terminate.
type PublishMessageTransformerFunc func(*pubsub.Message) (*pb.PubSubMessage, error)
// PublishSettings control the batching of published messages. These settings
// apply per partition.
//
// Use DefaultPublishSettings for defaults, as an empty PublishSettings will
// fail validation.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
DelayThreshold time.Duration
// Publish a batch when it has this many messages. Must be > 0. The maximum is
// MaxPublishRequestCount.
CountThreshold int
// Publish a batch when its size in bytes reaches this value. Must be > 0. The
// maximum is MaxPublishRequestBytes.
ByteThreshold int
// The maximum time that the client will attempt to establish a publish stream
// connection to the server. Must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
// that occurred while trying to reconnect. Note that if the timeout duration
// is long, ErrOverflow may occur first.
Timeout time.Duration
// The maximum number of bytes that the publisher will keep in memory before
// returning ErrOverflow. Must be > 0.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
// buffer size can mitigate transient publish spikes. However, consistently
// attempting to publish messages at a much higher rate than the publishing
// throughput capacity can cause the buffers to overflow. For more
// information, see https://cloud.google.com/pubsub/lite/docs/topics.
BufferedByteLimit int
// The polling interval to watch for topic partition count updates. Set to 0
// to disable polling if the number of partitions will never update.
ConfigPollPeriod time.Duration
// Optional custom function that extracts an ordering key from a Message. The
// default implementation extracts the key from Message.OrderingKey.
KeyExtractor KeyExtractorFunc
// Optional custom function that transforms a pubsub.Message to a
// PubSubMessage API proto.
MessageTransformer PublishMessageTransformerFunc
}
// DefaultPublishSettings holds the default values for PublishSettings.
var DefaultPublishSettings = PublishSettings{
DelayThreshold: wire.DefaultPublishSettings.DelayThreshold,
CountThreshold: wire.DefaultPublishSettings.CountThreshold,
ByteThreshold: wire.DefaultPublishSettings.ByteThreshold,
Timeout: wire.DefaultPublishSettings.Timeout,
BufferedByteLimit: wire.DefaultPublishSettings.BufferedByteLimit,
ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod,
}
func (s *PublishSettings) toWireSettings() wire.PublishSettings {
return wire.PublishSettings{
DelayThreshold: s.DelayThreshold,
CountThreshold: s.CountThreshold,
ByteThreshold: s.ByteThreshold,
Timeout: s.Timeout,
BufferedByteLimit: s.BufferedByteLimit,
ConfigPollPeriod: s.ConfigPollPeriod,
Framework: wire.FrameworkCloudPubSubShim,
}
}
// NackHandler is invoked when pubsub.Message.Nack() is called. Cloud Pub/Sub
// Lite does not have a concept of 'nack'. If the nack handler implementation
// returns nil, the message is acknowledged. If an error is returned, the
// SubscriberClient will will consider this a fatal error and terminate.
//
// In Cloud Pub/Sub Lite, only a single subscriber for a given subscription is
// connected to any partition at a time, and there is no other client that may
// be able to handle messages.
type NackHandler func(*pubsub.Message) error
// ReceiveMessageTransformerFunc transforms a PubSubMessage API proto to a
// pubsub.Message. If this returns an error, the SubscriberClient will consider
// this a fatal error and terminate.
type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error
// ReceiveSettings configure the Receive method. These settings apply per
// partition.
//
// Use DefaultReceiveSettings for defaults, as an empty ReceiveSettings will
// fail validation.
type ReceiveSettings struct {
// MaxOutstandingMessages is the maximum number of unacknowledged messages.
// Must be > 0.
MaxOutstandingMessages int
// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. Must be > 0.
MaxOutstandingBytes int
// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. Must be > 0.
//
// The timeout is exceeded, the SubscriberClient will terminate with the last
// error that occurred while trying to reconnect.
Timeout time.Duration
// The topic partition numbers (zero-indexed) to receive messages from.
// Values must be less than the number of partitions for the topic. If not
// specified, the SubscriberClient will use the partition assignment service
// to determine which partitions it should connect to.
Partitions []int
// Optional custom function to handle pubsub.Message.Nack() calls. If not set,
// the default behavior is to immediately terminate the SubscriberClient with
// a fatal error.
NackHandler NackHandler
// Optional custom function that transforms a PubSubMessage API proto to a
// pubsub.Message.
MessageTransformer ReceiveMessageTransformerFunc
}
// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: wire.DefaultReceiveSettings.MaxOutstandingMessages,
MaxOutstandingBytes: wire.DefaultReceiveSettings.MaxOutstandingBytes,
Timeout: wire.DefaultReceiveSettings.Timeout,
}
func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings {
return wire.ReceiveSettings{
MaxOutstandingMessages: s.MaxOutstandingMessages,
MaxOutstandingBytes: s.MaxOutstandingBytes,
Timeout: s.Timeout,
Framework: wire.FrameworkCloudPubSubShim,
}
}