diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 89973e20cc7..22309bf0107 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -4,11 +4,11 @@ go 1.11 require ( cloud.google.com/go v0.74.0 + cloud.google.com/go/pubsub v1.9.1 github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.4 github.com/google/uuid v1.1.2 github.com/googleapis/gax-go/v2 v2.0.5 - golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 // indirect google.golang.org/api v0.36.0 google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a google.golang.org/grpc v1.34.0 diff --git a/pubsublite/go.sum b/pubsublite/go.sum index fdd5e2ebd1a..ec15514746d 100644 --- a/pubsublite/go.sum +++ b/pubsublite/go.sum @@ -15,6 +15,7 @@ cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOY cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww= cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= +cloud.google.com/go v0.73.0/go.mod h1:BkDh9dFvGjCitVw03TNjKbBxXNKULXXIq6orU6HrJ4Q= cloud.google.com/go v0.74.0 h1:kpgPA77kSSbjSs+fWHkPTxQ6J5Z2Qkruo5jfXEkHxNQ= cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= @@ -29,6 +30,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.9.1 h1:hXEte3a/Brd+Tl9ecEkHH3ow9wpnOTZ28lSOszYj6Cg= +cloud.google.com/go/pubsub v1.9.1/go.mod h1:7QTUeCiy+P1dVPO8hHVbZSHDfibbgm1gbKyOVYnqb8g= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= @@ -105,6 +108,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20201117184057-ae444373da19/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= @@ -212,6 +216,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjN golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 h1:lwlPPsmjDKK0J6eG6xDWd5XPehI0R024zxjDnw3esPA= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -235,6 +240,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -279,6 +286,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -322,9 +330,9 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201208233053-a543418bbed2 h1:vEtypaVub6UvKkiXZ2xx9QIvp9TL7sI7xp7vdi2kezA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 h1:9j/upNXDRpADUw2RpUfJ7E7GHtfhDih62kX6JM8vs2c= -golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -391,6 +399,8 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201203001206-6486ece9c497/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a h1:GnJAhasbD8HiT8DZMvsEx3QLVy/X0icq/MGr0MqRJ2M= google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index e2cc5699746..f1dcd6cb0ea 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -31,9 +31,15 @@ const ( // MaxPublishRequestBytes is the maximum allowed serialized size of a single // publish request (containing a batch of messages) in bytes. Must be lower // than the gRPC limit of 4 MiB. - MaxPublishRequestBytes = 3500000 + MaxPublishRequestBytes int = 3.5 * 1024 * 1024 ) +// FrameworkType is the user-facing API for Cloud Pub/Sub Lite. +type FrameworkType string + +// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub. +const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM" + // PublishSettings control the batching of published messages. These settings // apply per partition. type PublishSettings struct { @@ -70,6 +76,9 @@ type PublishSettings struct { // The polling interval to watch for topic partition count updates. Set to 0 // to disable polling if the number of partitions will never update. ConfigPollPeriod time.Duration + + // The user-facing API type. + Framework FrameworkType } // DefaultPublishSettings holds the default values for PublishSettings. @@ -132,6 +141,9 @@ type ReceiveSettings struct { // specified, the client will use the partition assignment service to // determine which partitions it should connect to. Partitions []int + + // The user-facing API type. + Framework FrameworkType } // DefaultReceiveSettings holds the default values for ReceiveSettings. diff --git a/pubsublite/ps/doc.go b/pubsublite/ps/doc.go new file mode 100644 index 00000000000..43b80de96ea --- /dev/null +++ b/pubsublite/ps/doc.go @@ -0,0 +1,43 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +/* +Package ps contains clients for publishing and subscribing using the Google +Cloud Pub/Sub Lite service. + +If interfaces are defined, PublisherClient and SubscriberClient can be used as +substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(), +respectively, from the pubsub package. + +As noted in comments, the two services have some differences: + - Pub/Sub Lite does not support nack for messages. By default, this will + terminate the SubscriberClient. A custom function can be provided for + ReceiveSettings.NackHandler to handle nacked messages. + - Pub/Sub Lite has no concept of ack expiration. Subscribers must ack or nack + every message received. + - Pub/Sub Lite PublisherClients can terminate when an unretryable error + occurs. + - Publishers and subscribers will be throttled if Pub/Sub Lite publish or + subscribe throughput limits are exceeded. Thus publishing can be more + sensitive to buffer overflow than Cloud Pub/Sub. + +More information about Google Cloud Pub/Sub Lite is available at +https://cloud.google.com/pubsub/lite. + +Information about choosing between Google Cloud Pub/Sub vs Pub/Sub Lite is +available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite. + +See https://godoc.org/cloud.google.com/go for authentication, timeouts, +connection pooling and similar aspects of this package. +*/ +package ps // import "cloud.google.com/go/pubsublite/ps" diff --git a/pubsublite/ps/message.go b/pubsublite/ps/message.go new file mode 100644 index 00000000000..011b4b303b2 --- /dev/null +++ b/pubsublite/ps/message.go @@ -0,0 +1,133 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "encoding/base64" + "errors" + "fmt" + + "cloud.google.com/go/pubsub" + "github.com/golang/protobuf/ptypes" + "google.golang.org/protobuf/proto" + + tspb "github.com/golang/protobuf/ptypes/timestamp" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// Message transforms and event timestamp encoding mirrors the Java client +// library implementation: +// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java +const eventTimestampAttributeKey = "x-goog-pubsublite-event-time-timestamp-proto" + +var errInvalidMessage = errors.New("pubsublite: invalid received message") + +// Encodes a timestamp in a way that it will be interpreted as an event time if +// published on a message with an attribute named eventTimestampAttributeKey. +func encodeEventTimestamp(eventTime *tspb.Timestamp) (string, error) { + bytes, err := proto.Marshal(eventTime) + if err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(bytes), nil +} + +// Decodes a timestamp encoded with encodeEventTimestamp. +func decodeEventTimestamp(value string) (*tspb.Timestamp, error) { + bytes, err := base64.StdEncoding.DecodeString(value) + if err != nil { + return nil, err + } + eventTime := &tspb.Timestamp{} + if err := proto.Unmarshal(bytes, eventTime); err != nil { + return nil, err + } + return eventTime, nil +} + +// extractOrderingKey extracts the ordering key from the message for routing +// during publishing. It is the default KeyExtractorFunc implementation. +func extractOrderingKey(msg *pubsub.Message) []byte { + if len(msg.OrderingKey) == 0 { + return nil + } + return []byte(msg.OrderingKey) +} + +// transformPublishedMessage is the default PublishMessageTransformerFunc +// implementation. +func transformPublishedMessage(from *pubsub.Message, to *pb.PubSubMessage, extractKey KeyExtractorFunc) error { + to.Data = from.Data + to.Key = extractKey(from) + + if len(from.Attributes) > 0 { + to.Attributes = make(map[string]*pb.AttributeValues) + for key, value := range from.Attributes { + if key == eventTimestampAttributeKey { + eventpb, err := decodeEventTimestamp(value) + if err != nil { + return err + } + to.EventTime = eventpb + } else { + to.Attributes[key] = &pb.AttributeValues{Values: [][]byte{[]byte(value)}} + } + } + } + return nil +} + +// transformReceivedMessage is the default ReceiveMessageTransformerFunc +// implementation. +func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) error { + if from == nil || from.GetMessage() == nil { + // This should not occur, but guard against nil. + return errInvalidMessage + } + + var err error + msg := from.GetMessage() + + if from.GetPublishTime() != nil { + if to.PublishTime, err = ptypes.Timestamp(from.GetPublishTime()); err != nil { + return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err) + } + } + if from.GetCursor() != nil { + to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset()) + } + if len(msg.GetKey()) > 0 { + to.OrderingKey = string(msg.GetKey()) + } + to.Data = msg.GetData() + to.Attributes = make(map[string]string) + + if msg.EventTime != nil { + val, err := encodeEventTimestamp(msg.EventTime) + if err != nil { + return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err) + } + to.Attributes[eventTimestampAttributeKey] = val + } + for key, values := range msg.Attributes { + if key == eventTimestampAttributeKey { + return fmt.Errorf("%s: attribute with reserved key %q exists in API message", errInvalidMessage.Error(), eventTimestampAttributeKey) + } + if len(values.Values) > 1 { + return fmt.Errorf("%s: cannot transform API message with multiple values for attribute with key %q", errInvalidMessage.Error(), key) + } + to.Attributes[key] = string(values.Values[0]) + } + return nil +} diff --git a/pubsublite/ps/message_test.go b/pubsublite/ps/message_test.go new file mode 100644 index 00000000000..aec1838c7cb --- /dev/null +++ b/pubsublite/ps/message_test.go @@ -0,0 +1,145 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "log" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsub" + "github.com/google/go-cmp/cmp/cmpopts" + + tspb "github.com/golang/protobuf/ptypes/timestamp" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +func encodeTimestamp(seconds int64, nanos int32) string { + val, err := encodeEventTimestamp(&tspb.Timestamp{ + Seconds: seconds, + Nanos: nanos, + }) + if err != nil { + log.Fatal(err) + } + return val +} + +func TestMessageTransforms(t *testing.T) { + for _, tc := range []struct { + desc string + wireMsg *pb.SequencedMessage + wantMsg *pubsub.Message + wantErr bool + }{ + { + desc: "valid: full message", + wireMsg: &pb.SequencedMessage{ + Cursor: &pb.Cursor{Offset: 10}, + PublishTime: &tspb.Timestamp{ + Seconds: 1577836800, + Nanos: 900800700, + }, + Message: &pb.PubSubMessage{ + Data: []byte("foo"), + Key: []byte("bar"), + EventTime: &tspb.Timestamp{ + Seconds: 1577836800, + Nanos: 500400300, + }, + Attributes: map[string]*pb.AttributeValues{ + "attr1": {Values: [][]byte{[]byte("hello")}}, + "attr2": {Values: [][]byte{[]byte("world")}}, + }, + }, + }, + wantMsg: &pubsub.Message{ + ID: "10", + PublishTime: time.Unix(1577836800, 900800700), + Data: []byte("foo"), + OrderingKey: "bar", + Attributes: map[string]string{ + "attr1": "hello", + "attr2": "world", + "x-goog-pubsublite-event-time-timestamp-proto": encodeTimestamp(1577836800, 500400300), + }, + }, + }, + { + desc: "valid: minimum", + wireMsg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{}, + }, + wantMsg: &pubsub.Message{}, + }, + { + desc: "invalid: sequenced message nil", + wantErr: true, + }, + { + desc: "invalid: pubsubmessage nil", + wireMsg: &pb.SequencedMessage{}, + wantErr: true, + }, + { + desc: "invalid: multiple attribute values", + wireMsg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{ + Attributes: map[string]*pb.AttributeValues{ + "attr1": {Values: [][]byte{[]byte("hello"), []byte("bar")}}, + }, + }, + }, + wantErr: true, + }, + { + desc: "invalid: event time is attribute", + wireMsg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{ + Attributes: map[string]*pb.AttributeValues{ + "x-goog-pubsublite-event-time-timestamp-proto": { + Values: [][]byte{[]byte(encodeTimestamp(1577836800, 500400300))}, + }, + }, + }, + }, + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotRecvMsg := new(pubsub.Message) + gotErr := transformReceivedMessage(tc.wireMsg, gotRecvMsg) + if (gotErr != nil) != tc.wantErr { + t.Errorf("transformReceivedMessage() err = (%v), want err=%v", gotErr, tc.wantErr) + } + + if tc.wantMsg != nil { + if diff := testutil.Diff(gotRecvMsg, tc.wantMsg, cmpopts.IgnoreUnexported(pubsub.Message{}), cmpopts.EquateEmpty()); diff != "" { + t.Errorf("transformReceivedMessage() got: -, want: +\n%s", diff) + } + + // Check reverse conversion equals input. + gotPubMsg := new(pb.PubSubMessage) + gotErr := transformPublishedMessage(tc.wantMsg, gotPubMsg, extractOrderingKey) + if gotErr != nil { + t.Errorf("transformPublishedMessage() err = (%v)", gotErr) + } + if diff := testutil.Diff(gotPubMsg, tc.wireMsg.Message, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("transformPublishedMessage() got: -, want: +\n%s", diff) + } + } + }) + } +} diff --git a/pubsublite/ps/settings.go b/pubsublite/ps/settings.go new file mode 100644 index 00000000000..6a8be9d1bb5 --- /dev/null +++ b/pubsublite/ps/settings.go @@ -0,0 +1,215 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "time" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite/internal/wire" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +const ( + // MaxPublishRequestCount is the maximum number of messages that can be + // batched in a single publish request. + MaxPublishRequestCount = wire.MaxPublishRequestCount + + // MaxPublishRequestBytes is the maximum allowed serialized size of a single + // publish request (containing a batch of messages) in bytes. + MaxPublishRequestBytes = wire.MaxPublishRequestBytes +) + +// KeyExtractorFunc is a function that extracts an ordering key from a Message. +type KeyExtractorFunc func(*pubsub.Message) []byte + +// PublishMessageTransformerFunc transforms a pubsub.Message to a Pub/Sub Lite +// PubSubMessage API proto. If this returns an error, the pubsub.PublishResult +// will be errored and the PublisherClient will consider this a fatal error and +// terminate. +type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) error + +// PublishSettings configure the PublisherClient. These settings apply per +// partition. If BufferedByteLimit is being used to bound memory usage, keep in +// mind the number of partitions in the topic. +// +// A zero PublishSettings will result in values equivalent to +// DefaultPublishSettings. +type PublishSettings struct { + // Publish a non-empty batch after this delay has passed. If DelayThreshold is + // 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise + // must be > 0. + DelayThreshold time.Duration + + // Publish a batch when it has this many messages. The maximum is + // MaxPublishRequestCount. If CountThreshold is 0, it will be treated as + // DefaultPublishSettings.CountThreshold. Otherwise must be > 0. + CountThreshold int + + // Publish a batch when its size in bytes reaches this value. The maximum is + // MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as + // DefaultPublishSettings.ByteThreshold. Otherwise must be > 0. + ByteThreshold int + + // The maximum time that the client will attempt to establish a publish stream + // connection to the server. If Timeout is 0, it will be treated as + // DefaultPublishSettings.Timeout. Otherwise must be > 0. + // + // The timeout is exceeded, the publisher will terminate with the last error + // that occurred while trying to reconnect. Note that if the timeout duration + // is long, ErrOverflow may occur first. + Timeout time.Duration + + // The maximum number of bytes that the publisher will keep in memory before + // returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as + // DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0. + // + // Note that Pub/Sub Lite topics are provisioned a publishing throughput + // capacity, per partition, shared by all publisher clients. Setting a large + // buffer size can mitigate transient publish spikes. However, consistently + // attempting to publish messages at a much higher rate than the publishing + // throughput capacity can cause the buffers to overflow. For more + // information, see https://cloud.google.com/pubsub/lite/docs/topics. + BufferedByteLimit int + + // Optional custom function that extracts an ordering key from a Message. The + // default implementation extracts the key from Message.OrderingKey. + KeyExtractor KeyExtractorFunc + + // Optional custom function that transforms a pubsub.Message to a + // PubSubMessage API proto. + MessageTransformer PublishMessageTransformerFunc +} + +// DefaultPublishSettings holds the default values for PublishSettings. +var DefaultPublishSettings = PublishSettings{ + DelayThreshold: 10 * time.Millisecond, + CountThreshold: 100, + ByteThreshold: 1e6, + Timeout: 60 * time.Second, + BufferedByteLimit: 1e8, +} + +func (s *PublishSettings) toWireSettings() wire.PublishSettings { + wireSettings := wire.PublishSettings{ + DelayThreshold: DefaultPublishSettings.DelayThreshold, + CountThreshold: DefaultPublishSettings.CountThreshold, + ByteThreshold: DefaultPublishSettings.ByteThreshold, + Timeout: DefaultPublishSettings.Timeout, + BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + } + // Negative values preserved, but will fail validation in wire package. + if s.DelayThreshold != 0 { + wireSettings.DelayThreshold = s.DelayThreshold + } + if s.CountThreshold != 0 { + wireSettings.CountThreshold = s.CountThreshold + } + if s.ByteThreshold != 0 { + wireSettings.ByteThreshold = s.ByteThreshold + } + if s.Timeout != 0 { + wireSettings.Timeout = s.Timeout + } + if s.BufferedByteLimit != 0 { + wireSettings.BufferedByteLimit = s.BufferedByteLimit + } + return wireSettings +} + +// NackHandler is invoked when pubsub.Message.Nack() is called. Cloud Pub/Sub +// Lite does not have a concept of 'nack'. If the nack handler implementation +// returns nil, the message is acknowledged. If an error is returned, the +// SubscriberClient will consider this a fatal error and terminate. +// +// In Cloud Pub/Sub Lite, only a single subscriber for a given subscription is +// connected to any partition at a time, and there is no other client that may +// be able to handle messages. +type NackHandler func(*pubsub.Message) error + +// ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API +// proto to a pubsub.Message. If this returns an error, the SubscriberClient +// will consider this a fatal error and terminate. +type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error + +// ReceiveSettings configure the SubscriberClient. These settings apply per +// partition. If MaxOutstandingBytes is being used to bound memory usage, keep +// in mind the number of partitions in the associated topic. +// +// A zero ReceiveSettings will result in values equivalent to +// DefaultReceiveSettings. +type ReceiveSettings struct { + // MaxOutstandingMessages is the maximum number of unacknowledged messages. + // If MaxOutstandingMessages is 0, it will be treated as + // DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0. + MaxOutstandingMessages int + + // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged + // messages. If MaxOutstandingBytes is 0, it will be treated as + // DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0. + MaxOutstandingBytes int + + // The maximum time that the client will attempt to establish a subscribe + // stream connection to the server. If Timeout is 0, it will be treated as + // DefaultReceiveSettings.Timeout. Otherwise must be > 0. + // + // The timeout is exceeded, the SubscriberClient will terminate with the last + // error that occurred while trying to reconnect. + Timeout time.Duration + + // The topic partition numbers (zero-indexed) to receive messages from. + // Values must be less than the number of partitions for the topic. If not + // specified, the SubscriberClient will use the partition assignment service + // to determine which partitions it should connect to. + Partitions []int + + // Optional custom function to handle pubsub.Message.Nack() calls. If not set, + // the default behavior is to terminate the SubscriberClient. + NackHandler NackHandler + + // Optional custom function that transforms a SequencedMessage API proto to a + // pubsub.Message. + MessageTransformer ReceiveMessageTransformerFunc +} + +// DefaultReceiveSettings holds the default values for ReceiveSettings. +var DefaultReceiveSettings = ReceiveSettings{ + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, + Timeout: 60 * time.Second, +} + +func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings { + wireSettings := wire.ReceiveSettings{ + MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, + MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, + Timeout: DefaultReceiveSettings.Timeout, + Partitions: s.Partitions, + Framework: wire.FrameworkCloudPubSubShim, + } + // Negative values preserved, but will fail validation in wire package. + if s.MaxOutstandingMessages != 0 { + wireSettings.MaxOutstandingMessages = s.MaxOutstandingMessages + } + if s.MaxOutstandingBytes != 0 { + wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes + } + if s.Timeout != 0 { + wireSettings.Timeout = s.Timeout + } + return wireSettings +} diff --git a/pubsublite/ps/settings_test.go b/pubsublite/ps/settings_test.go new file mode 100644 index 00000000000..900af0efdcf --- /dev/null +++ b/pubsublite/ps/settings_test.go @@ -0,0 +1,154 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "testing" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/internal/wire" +) + +func TestPublishSettingsToWireSettings(t *testing.T) { + for _, tc := range []struct { + desc string + settings PublishSettings + wantSettings wire.PublishSettings + }{ + { + desc: "default settings", + settings: DefaultPublishSettings, + wantSettings: wire.PublishSettings{ + DelayThreshold: DefaultPublishSettings.DelayThreshold, + CountThreshold: DefaultPublishSettings.CountThreshold, + ByteThreshold: DefaultPublishSettings.ByteThreshold, + Timeout: DefaultPublishSettings.Timeout, + BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "zero settings", + settings: PublishSettings{}, + wantSettings: DefaultPublishSettings.toWireSettings(), + }, + { + desc: "positive values", + settings: PublishSettings{ + DelayThreshold: 2, + CountThreshold: 3, + ByteThreshold: 4, + Timeout: 5, + BufferedByteLimit: 6, + }, + wantSettings: wire.PublishSettings{ + DelayThreshold: 2, + CountThreshold: 3, + ByteThreshold: 4, + Timeout: 5, + BufferedByteLimit: 6, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "negative values", + settings: PublishSettings{ + DelayThreshold: -2, + CountThreshold: -3, + ByteThreshold: -4, + Timeout: -5, + BufferedByteLimit: -6, + }, + wantSettings: wire.PublishSettings{ + DelayThreshold: -2, + CountThreshold: -3, + ByteThreshold: -4, + Timeout: -5, + BufferedByteLimit: -6, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" { + t.Errorf("PublishSettings.toWireSettings() got: -, want: +\n%s", diff) + } + }) + } +} + +func TestReceiveSettingsToWireSettings(t *testing.T) { + for _, tc := range []struct { + desc string + settings ReceiveSettings + wantSettings wire.ReceiveSettings + }{ + { + desc: "default settings", + settings: DefaultReceiveSettings, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, + MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, + Timeout: DefaultReceiveSettings.Timeout, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "zero settings", + settings: ReceiveSettings{}, + wantSettings: DefaultReceiveSettings.toWireSettings(), + }, + { + desc: "positive values", + settings: ReceiveSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 3, + Timeout: 4, + Partitions: []int{5, 6}, + }, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 3, + Timeout: 4, + Partitions: []int{5, 6}, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "negative values", + settings: ReceiveSettings{ + MaxOutstandingMessages: -2, + MaxOutstandingBytes: -3, + Timeout: -4, + Partitions: []int{-5, -6}, + }, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: -2, + MaxOutstandingBytes: -3, + Timeout: -4, + Partitions: []int{-5, -6}, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" { + t.Errorf("ReceiveSettings.toWireSettings() got: -, want: +\n%s", diff) + } + }) + } +}