Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): settings and message transforms for Cloud Pub/Sub shim #3281

Merged
merged 26 commits into from Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a3f7f1b
feat(pubsublite): settings and message transforms for Cloud Pub/Sub shim
tmdiep Nov 26, 2020
60a197a
Update comments
tmdiep Nov 26, 2020
02bc46e
Comment nits
tmdiep Nov 27, 2020
00f6b95
Copy ReceiveSettings.Partitions
tmdiep Nov 27, 2020
28ff3a4
Address review comments
tmdiep Nov 30, 2020
c5b609c
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 1, 2020
68015cc
Update go.sum
tmdiep Dec 1, 2020
f0e7b2d
Fix comment formatting for godoc
tmdiep Dec 1, 2020
87e9b3b
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 2, 2020
e2472bc
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 2, 2020
0d5646c
Remove PublisherSettings.ConfigPollPeriod and update comments.
tmdiep Dec 2, 2020
676e849
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 3, 2020
e1baf46
Comment rewording
tmdiep Dec 3, 2020
4934529
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 7, 2020
d5004b1
We decided to remove the 1 MiB per-message publish limit
tmdiep Dec 9, 2020
e02a420
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 9, 2020
36a556c
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 9, 2020
1cbd101
Treat zero settings as default settings. Add unit tests
tmdiep Dec 9, 2020
fe4a06c
Merge branch 'refs/heads/master' into shim_settings
tmdiep Dec 11, 2020
f78cd68
Update to pubsub v1.9.1 and run `go mod tidy`
tmdiep Dec 11, 2020
7ff5e42
Merge branch 'master' and fix conflicts
tmdiep Dec 14, 2020
8fc6bc3
Link to Java message transform impl
tmdiep Dec 14, 2020
7bed62e
Merge branch 'master' into shim_settings
tmdiep Dec 22, 2020
46270cd
Minor comment and error message revisions
tmdiep Dec 22, 2020
e13daa7
Merge branch 'master' into shim_settings
tmdiep Dec 22, 2020
58725a5
Merge branch 'master' into shim_settings
tmdiep Dec 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -4,6 +4,7 @@ go 1.11

require (
cloud.google.com/go v0.72.0
cloud.google.com/go/pubsub v1.8.3
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.3
github.com/googleapis/gax-go/v2 v2.0.5
Expand Down
10 changes: 10 additions & 0 deletions pubsublite/go.sum
Expand Up @@ -13,6 +13,7 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV
cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.71.0/go.mod h1:qZfY4Y7AEIQwG/fQYD3xrxLNkQZ0Xzf3HGeqCkA6LVM=
cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww=
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
Expand All @@ -26,7 +27,10 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1 h1:ukjixP1wl0LpnZ6LWtZJ0mX5tBmjp1f8Sqer8Z2OMUU=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.8.3 h1:kl5QdIn98mYhX+G7OzdQ9W3SQ0XXdhHlTw0GHa723pI=
cloud.google.com/go/pubsub v1.8.3/go.mod h1:m8NMRz5lt0YjbQQ40RjocDVRjgYyzyYpP6ix3dxwRno=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down Expand Up @@ -199,6 +203,7 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201026091529-146b70c837a4/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -258,6 +263,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down Expand Up @@ -299,6 +305,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201030143252-cf7a54d06671/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201119132711-4783bc9bebf0 h1:26vapYZ9m+DJd68m3DCFP/swNNErXAU7tOMFG7NyUuM=
golang.org/x/tools v0.0.0-20201119132711-4783bc9bebf0/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
Expand All @@ -323,6 +330,7 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.34.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/api v0.35.0 h1:TBCmTTxUrRDA1iTctnK/fIeitxIZ+TQuaf0j29fmCGo=
google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down Expand Up @@ -362,6 +370,7 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201030142918-24207fddd1c3/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 h1:Rt0FRalMgdSlXAVJvX4pr65KfqaxHXSLkSJRD9pw6g0=
google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand All @@ -378,6 +387,7 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2 h1:EQyQC3sa8M+p6Ulc8yy9SWSS2GVwyRc83gAbG8lrl4o=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
17 changes: 17 additions & 0 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -34,6 +34,12 @@ const (
MaxPublishRequestBytes = 3500000
)

// FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
type FrameworkType string

// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.
const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"

// PublishSettings control the batching of published messages. These settings
// apply per partition.
type PublishSettings struct {
Expand Down Expand Up @@ -66,6 +72,13 @@ type PublishSettings struct {
// throughput capacity can cause the buffers to overflow. For more
// information, see https://cloud.google.com/pubsub/lite/docs/topics.
BufferedByteLimit int

// The polling interval to watch for topic partition count updates. Set to 0
// to disable polling if the number of partitions will never update.
ConfigPollPeriod time.Duration

// The user-facing API type.
Framework FrameworkType
}

// DefaultPublishSettings holds the default values for PublishSettings.
Expand All @@ -77,6 +90,7 @@ var DefaultPublishSettings = PublishSettings{
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
ConfigPollPeriod: 10 * time.Minute,
}

func validatePublishSettings(settings PublishSettings) error {
Expand Down Expand Up @@ -127,6 +141,9 @@ type ReceiveSettings struct {
// specified, the client will use the partition assignment service to
// determine which partitions it should connect to.
Partitions []int

// The user-facing API type.
Framework FrameworkType
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
Expand Down
33 changes: 33 additions & 0 deletions pubsublite/ps/doc.go
@@ -0,0 +1,33 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

/*
Package ps contains a publisher and subscriber client for the Cloud Pub/Sub Lite
service that emulates the Cloud Pub/Sub API.

If interfaces are defined for pubsub.Topic.Publish() and
pubsub.Subscription.Receive(), the clients in this package can be used as
drop-in replacements. As noted in comments, the two services have some
differences:

- Pub/Sub Lite does not support nack for messages.
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
- Pub/Sub Lite has publish and subscribe throughput limits. Thus publishing can
be more sensitive to buffer overflow.
- Pub/Sub Lite publisher clients can terminate when an unretryable error occurs.
- DefaultPublishSettings and DefaultReceiveSettings should be used for default
settings rather than their empty types.

For more information about Cloud Pub/Sub Lite, see
https://cloud.google.com/pubsub/lite/docs.
*/
package ps // import "cloud.google.com/go/pubsublite/ps"
134 changes: 134 additions & 0 deletions pubsublite/ps/message.go
@@ -0,0 +1,134 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps

import (
"encoding/base64"
"errors"
"fmt"

"cloud.google.com/go/pubsub"
"github.com/golang/protobuf/ptypes"
"google.golang.org/protobuf/proto"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

const eventTimestampAttributeKey = "x-goog-pubsublite-event-time-timestamp-proto"

var (
errInvalidMessage = errors.New("pubsublite: invalid received message")
)

// Encodes a timestamp in a way that it will be interpreted as an event time if
// published on a message with an attribute named eventTimestampAttributeKey.
func encodeEventTimestamp(eventTime *tspb.Timestamp) (string, error) {
bytes, err := proto.Marshal(eventTime)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(bytes), nil
}

// Decodes a timestamp encoded with encodeEventTimestamp.
func decodeEventTimestamp(value string) (*tspb.Timestamp, error) {
bytes, err := base64.StdEncoding.DecodeString(value)
if err != nil {
return nil, err
}
eventTime := &tspb.Timestamp{}
if err := proto.Unmarshal(bytes, eventTime); err != nil {
return nil, err
}
return eventTime, nil
}

// extractOrderingKey extracts the ordering key from the message for routing
// during publishing. It is the default KeyExtractorFunc implementation.
func extractOrderingKey(msg *pubsub.Message) []byte {
if len(msg.OrderingKey) == 0 {
return nil
}
return []byte(msg.OrderingKey)
}

// transformPublishedMessage is the default PublishMessageTransformerFunc
// implementation.
func transformPublishedMessage(from *pubsub.Message, extractKey KeyExtractorFunc) (*pb.PubSubMessage, error) {
msgpb := &pb.PubSubMessage{
Data: from.Data,
Key: extractKey(from),
}

if len(from.Attributes) > 0 {
msgpb.Attributes = make(map[string]*pb.AttributeValues)
for key, value := range from.Attributes {
if key == eventTimestampAttributeKey {
eventpb, err := decodeEventTimestamp(value)
if err != nil {
return nil, err
}
msgpb.EventTime = eventpb
} else {
msgpb.Attributes[key] = &pb.AttributeValues{Values: [][]byte{[]byte(value)}}
}
}
}
return msgpb, nil
}

// transformReceivedMessage is the default ReceiveMessageTransformerFunc
// implementation.
func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) error {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
if from == nil || from.GetMessage() == nil {
// This should not occur, but guard against nil.
return errInvalidMessage
}

var err error
msg := from.GetMessage()

if from.GetPublishTime() != nil {
if to.PublishTime, err = ptypes.Timestamp(from.GetPublishTime()); err != nil {
return err
}
}
if from.GetCursor() != nil {
to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset())
}
if len(msg.GetKey()) > 0 {
to.OrderingKey = string(msg.GetKey())
}
to.Data = msg.GetData()
to.Attributes = make(map[string]string)

if msg.EventTime != nil {
val, err := encodeEventTimestamp(msg.EventTime)
if err != nil {
return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err)
}
to.Attributes[eventTimestampAttributeKey] = val
}
for key, values := range msg.Attributes {
if key == eventTimestampAttributeKey {
return fmt.Errorf("%s: reserved attribute with key %s exists in wire message", errInvalidMessage.Error(), eventTimestampAttributeKey)
}
if len(values.Values) > 1 {
return fmt.Errorf("pubsublite: cannot transform wire message with multiple values for attribute %q", key)
}
to.Attributes[key] = string(values.Values[0])
}
return nil
}