Skip to content

Commit

Permalink
feat(pubsublite): settings and message transforms for Cloud Pub/Sub s…
Browse files Browse the repository at this point in the history
…him (#3281)

The shim implements various features in the Pub/Sub Lite Java client library for feature parity, e.g. nack handler, custom message transforms, key extractor, etc. It also implements some features from the Cloud Pub/Sub Go library, e.g. publish buffer byte limit.
  • Loading branch information
tmdiep committed Dec 23, 2020
1 parent 18e3a4f commit 74923c2
Show file tree
Hide file tree
Showing 8 changed files with 716 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pubsublite/go.mod
Expand Up @@ -4,11 +4,11 @@ go 1.11

require (
cloud.google.com/go v0.74.0
cloud.google.com/go/pubsub v1.9.1
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 // indirect
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
google.golang.org/grpc v1.34.0
Expand Down
14 changes: 12 additions & 2 deletions pubsublite/go.sum
Expand Up @@ -15,6 +15,7 @@ cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOY
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww=
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go v0.73.0/go.mod h1:BkDh9dFvGjCitVw03TNjKbBxXNKULXXIq6orU6HrJ4Q=
cloud.google.com/go v0.74.0 h1:kpgPA77kSSbjSs+fWHkPTxQ6J5Z2Qkruo5jfXEkHxNQ=
cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
Expand All @@ -29,6 +30,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.9.1 h1:hXEte3a/Brd+Tl9ecEkHH3ow9wpnOTZ28lSOszYj6Cg=
cloud.google.com/go/pubsub v1.9.1/go.mod h1:7QTUeCiy+P1dVPO8hHVbZSHDfibbgm1gbKyOVYnqb8g=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down Expand Up @@ -105,6 +108,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201117184057-ae444373da19/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
Expand Down Expand Up @@ -212,6 +216,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjN
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 h1:lwlPPsmjDKK0J6eG6xDWd5XPehI0R024zxjDnw3esPA=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand All @@ -235,6 +240,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -279,6 +286,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down Expand Up @@ -322,9 +330,9 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2 h1:vEtypaVub6UvKkiXZ2xx9QIvp9TL7sI7xp7vdi2kezA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 h1:9j/upNXDRpADUw2RpUfJ7E7GHtfhDih62kX6JM8vs2c=
golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -391,6 +399,8 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201203001206-6486ece9c497/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a h1:GnJAhasbD8HiT8DZMvsEx3QLVy/X0icq/MGr0MqRJ2M=
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down
14 changes: 13 additions & 1 deletion pubsublite/internal/wire/settings.go
Expand Up @@ -31,9 +31,15 @@ const (
// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
MaxPublishRequestBytes = 3500000
MaxPublishRequestBytes int = 3.5 * 1024 * 1024
)

// FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
type FrameworkType string

// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.
const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"

// PublishSettings control the batching of published messages. These settings
// apply per partition.
type PublishSettings struct {
Expand Down Expand Up @@ -70,6 +76,9 @@ type PublishSettings struct {
// The polling interval to watch for topic partition count updates. Set to 0
// to disable polling if the number of partitions will never update.
ConfigPollPeriod time.Duration

// The user-facing API type.
Framework FrameworkType
}

// DefaultPublishSettings holds the default values for PublishSettings.
Expand Down Expand Up @@ -132,6 +141,9 @@ type ReceiveSettings struct {
// specified, the client will use the partition assignment service to
// determine which partitions it should connect to.
Partitions []int

// The user-facing API type.
Framework FrameworkType
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
Expand Down
43 changes: 43 additions & 0 deletions pubsublite/ps/doc.go
@@ -0,0 +1,43 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

/*
Package ps contains clients for publishing and subscribing using the Google
Cloud Pub/Sub Lite service.
If interfaces are defined, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
respectively, from the pubsub package.
As noted in comments, the two services have some differences:
- Pub/Sub Lite does not support nack for messages. By default, this will
terminate the SubscriberClient. A custom function can be provided for
ReceiveSettings.NackHandler to handle nacked messages.
- Pub/Sub Lite has no concept of ack expiration. Subscribers must ack or nack
every message received.
- Pub/Sub Lite PublisherClients can terminate when an unretryable error
occurs.
- Publishers and subscribers will be throttled if Pub/Sub Lite publish or
subscribe throughput limits are exceeded. Thus publishing can be more
sensitive to buffer overflow than Cloud Pub/Sub.
More information about Google Cloud Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.
Information about choosing between Google Cloud Pub/Sub vs Pub/Sub Lite is
available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
See https://godoc.org/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
*/
package ps // import "cloud.google.com/go/pubsublite/ps"
133 changes: 133 additions & 0 deletions pubsublite/ps/message.go
@@ -0,0 +1,133 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps

import (
"encoding/base64"
"errors"
"fmt"

"cloud.google.com/go/pubsub"
"github.com/golang/protobuf/ptypes"
"google.golang.org/protobuf/proto"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// Message transforms and event timestamp encoding mirrors the Java client
// library implementation:
// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java
const eventTimestampAttributeKey = "x-goog-pubsublite-event-time-timestamp-proto"

var errInvalidMessage = errors.New("pubsublite: invalid received message")

// Encodes a timestamp in a way that it will be interpreted as an event time if
// published on a message with an attribute named eventTimestampAttributeKey.
func encodeEventTimestamp(eventTime *tspb.Timestamp) (string, error) {
bytes, err := proto.Marshal(eventTime)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(bytes), nil
}

// Decodes a timestamp encoded with encodeEventTimestamp.
func decodeEventTimestamp(value string) (*tspb.Timestamp, error) {
bytes, err := base64.StdEncoding.DecodeString(value)
if err != nil {
return nil, err
}
eventTime := &tspb.Timestamp{}
if err := proto.Unmarshal(bytes, eventTime); err != nil {
return nil, err
}
return eventTime, nil
}

// extractOrderingKey extracts the ordering key from the message for routing
// during publishing. It is the default KeyExtractorFunc implementation.
func extractOrderingKey(msg *pubsub.Message) []byte {
if len(msg.OrderingKey) == 0 {
return nil
}
return []byte(msg.OrderingKey)
}

// transformPublishedMessage is the default PublishMessageTransformerFunc
// implementation.
func transformPublishedMessage(from *pubsub.Message, to *pb.PubSubMessage, extractKey KeyExtractorFunc) error {
to.Data = from.Data
to.Key = extractKey(from)

if len(from.Attributes) > 0 {
to.Attributes = make(map[string]*pb.AttributeValues)
for key, value := range from.Attributes {
if key == eventTimestampAttributeKey {
eventpb, err := decodeEventTimestamp(value)
if err != nil {
return err
}
to.EventTime = eventpb
} else {
to.Attributes[key] = &pb.AttributeValues{Values: [][]byte{[]byte(value)}}
}
}
}
return nil
}

// transformReceivedMessage is the default ReceiveMessageTransformerFunc
// implementation.
func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) error {
if from == nil || from.GetMessage() == nil {
// This should not occur, but guard against nil.
return errInvalidMessage
}

var err error
msg := from.GetMessage()

if from.GetPublishTime() != nil {
if to.PublishTime, err = ptypes.Timestamp(from.GetPublishTime()); err != nil {
return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err)
}
}
if from.GetCursor() != nil {
to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset())
}
if len(msg.GetKey()) > 0 {
to.OrderingKey = string(msg.GetKey())
}
to.Data = msg.GetData()
to.Attributes = make(map[string]string)

if msg.EventTime != nil {
val, err := encodeEventTimestamp(msg.EventTime)
if err != nil {
return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err)
}
to.Attributes[eventTimestampAttributeKey] = val
}
for key, values := range msg.Attributes {
if key == eventTimestampAttributeKey {
return fmt.Errorf("%s: attribute with reserved key %q exists in API message", errInvalidMessage.Error(), eventTimestampAttributeKey)
}
if len(values.Values) > 1 {
return fmt.Errorf("%s: cannot transform API message with multiple values for attribute with key %q", errInvalidMessage.Error(), key)
}
to.Attributes[key] = string(values.Values[0])
}
return nil
}

0 comments on commit 74923c2

Please sign in to comment.