From 4a710f48303cccfb4d4011a1795c424d981724b4 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 1 Dec 2020 02:06:14 -0500 Subject: [PATCH 1/7] feat(pubsublite): publisher client for Cloud Pub/Sub shim --- pubsublite/go.mod | 2 + pubsublite/go.sum | 16 ++ pubsublite/internal/wire/errors.go | 10 +- pubsublite/internal/wire/publish_batcher.go | 3 +- pubsublite/ps/publisher.go | 167 +++++++++++++++ pubsublite/ps/publisher_test.go | 225 ++++++++++++++++++++ 6 files changed, 421 insertions(+), 2 deletions(-) create mode 100644 pubsublite/ps/publisher.go create mode 100644 pubsublite/ps/publisher_test.go diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 52b8dc2fe9b..3fd094c75a1 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -4,10 +4,12 @@ go 1.11 require ( cloud.google.com/go v0.72.0 + cloud.google.com/go/pubsub v1.8.1 github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.3 github.com/googleapis/gax-go/v2 v2.0.5 golang.org/x/tools v0.0.0-20201130220005-fd5f29369093 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/api v0.35.0 google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 google.golang.org/grpc v1.33.2 diff --git a/pubsublite/go.sum b/pubsublite/go.sum index cdb86672756..548d8418dae 100644 --- a/pubsublite/go.sum +++ b/pubsublite/go.sum @@ -13,6 +13,7 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= +cloud.google.com/go v0.66.0/go.mod h1:dgqGAjKCDxyhGTtC9dAREQGUJpkceNm1yt590Qno0Ko= cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww= cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= @@ -27,6 +28,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.8.1 h1:xuYU5DhMssHSUEF2KtTN84fPj+bbdCg5EU+l5JUN/lc= +cloud.google.com/go/pubsub v1.8.1/go.mod h1:l0z+j0Y73f7AzQfozIhekdyS9Sh5HdK/Bn77LD3lIMY= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= @@ -100,6 +103,7 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20200905233945-acf8798be1f7/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -197,6 +201,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= @@ -243,6 +248,7 @@ golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f h1:Fqb3ao1hUmOR3GkUOg/Y+BadLwykBIzs5q8Ez2SbHyc= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= @@ -258,6 +264,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -298,7 +305,10 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.0.0-20200828161849-5deb26317202/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= +golang.org/x/tools v0.0.0-20200915173823-2db8f0ff891c/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= +golang.org/x/tools v0.0.0-20201001104356-43ebab892c4c/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201130220005-fd5f29369093 h1:jcS5z8APUbxKseehAWrePw6Wq3yk3kLtuJcbA4AysLA= golang.org/x/tools v0.0.0-20201130220005-fd5f29369093/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -323,6 +333,8 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= +google.golang.org/api v0.31.0/go.mod h1:CL+9IBCa2WWU6gRuBWaKqGWLFFwbEUXkfeMkHLQWYWo= +google.golang.org/api v0.32.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg= google.golang.org/api v0.35.0 h1:TBCmTTxUrRDA1iTctnK/fIeitxIZ+TQuaf0j29fmCGo= google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -361,7 +373,10 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20200831141814-d751682dd103/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20200914193844-75d14daec038/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201001141541-efaab9d3c4f7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 h1:Rt0FRalMgdSlXAVJvX4pr65KfqaxHXSLkSJRD9pw6g0= google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= @@ -378,6 +393,7 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.2 h1:EQyQC3sa8M+p6Ulc8yy9SWSS2GVwyRc83gAbG8lrl4o= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go index 5dc5bc10167..4073d652e88 100644 --- a/pubsublite/internal/wire/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -13,13 +13,21 @@ package wire -import "errors" +import ( + "errors" + "fmt" +) +// Errors exported from this package. var ( // ErrOverflow indicates that the publish buffers have overflowed. See // comments for PublishSettings.BufferedByteLimit. ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed") + // ErrOversizedMessage indicates that the user published a message over the + // allowed serialized byte size limit. It is wrapped in another error. + ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishMessageBytes (%d)", MaxPublishMessageBytes) + // ErrServiceUninitialized indicates that a service (e.g. publisher or // subscriber) cannot perform an operation because it is uninitialized. ErrServiceUninitialized = errors.New("pubsublite: service must be started") diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go index 8a32accf248..ca0cc693162 100644 --- a/pubsublite/internal/wire/publish_batcher.go +++ b/pubsublite/internal/wire/publish_batcher.go @@ -19,6 +19,7 @@ import ( "fmt" "cloud.google.com/go/pubsublite/common" + "golang.org/x/xerrors" "google.golang.org/api/support/bundler" "google.golang.org/protobuf/proto" @@ -110,7 +111,7 @@ func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult Publi msgSize := proto.Size(msg) switch { case msgSize > MaxPublishMessageBytes: - return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes) + return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage) case msgSize > b.availableBufferBytes: return ErrOverflow } diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go new file mode 100644 index 00000000000..05827454004 --- /dev/null +++ b/pubsublite/ps/publisher.go @@ -0,0 +1,167 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "context" + "sync" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/common" + "cloud.google.com/go/pubsublite/internal/wire" + "golang.org/x/xerrors" + "google.golang.org/api/option" + "google.golang.org/api/support/bundler" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +var ( + // ErrOverflow is set for a PublishResult when publish buffers overflow. + ErrOverflow = bundler.ErrOverflow + + // ErrOversizedMessage is set for a PublishResult when a published message + // exceeds MaxPublishMessageBytes. + ErrOversizedMessage = bundler.ErrOversizedItem + + // ErrPublisherStopped is set for a PublishResult when a message cannot be + // published because the publisher client has stopped. PublisherClient.Error() + // returns the error that caused the publisher client to terminate (if any). + ErrPublisherStopped = wire.ErrServiceStopped +) + +// translateError transforms a subset of errors to what would be returned by the +// pubsub package. +func translateError(err error) error { + if xerrors.Is(err, wire.ErrOversizedMessage) { + return ErrOversizedMessage + } + if xerrors.Is(err, wire.ErrOverflow) { + return ErrOverflow + } + return err +} + +// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given +// topic. +// +// See https://cloud.google.com/pubsub/lite/docs/publishing for more information +// about publishing. +type PublisherClient struct { + settings PublishSettings + wirePub wire.Publisher + + // Fields below must be guarded with mutex. + mu sync.Mutex + err error +} + +// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish +// messages to a given topic. +// +// See https://cloud.google.com/pubsub/lite/docs/publishing for more information +// about publishing. +func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) { + region, err := pubsublite.ZoneToRegion(topic.Zone) + if err != nil { + return nil, err + } + + // Note: ctx is not used to create the wire publisher, because if it is + // cancelled, the publisher will not be able to perform graceful shutdown + // (e.g. flush pending messages). + wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...) + if err != nil { + return nil, err + } + wirePub.Start() + if err := wirePub.WaitStarted(); err != nil { + return nil, err + } + return &PublisherClient{settings: settings, wirePub: wirePub}, nil +} + +// Publish publishes `msg` to the topic asynchronously. Messages are batched and +// sent according to the client's PublishSettings. Publish never blocks. +// +// Publish returns a non-nil PublishResult which will be ready when the +// message has been sent (or has failed to be sent) to the server. +// +// Once Stop() has been called or the publisher has failed permanently due to an +// error, future calls to Publish will immediately return a PublishResult with +// error ErrPublisherStopped. Error() returns the error that caused the +// publisher to terminate. +func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult { + result := pubsub.NewPublishResult() + msgpb := new(pb.PubSubMessage) + if err := p.transformMessage(msg, msgpb); err != nil { + pubsub.SetPublishResult(result, "", err) + p.setError(err) + p.Stop() + return result + } + + p.wirePub.Publish(msgpb, func(pm *common.PublishMetadata, err error) { + err = translateError(err) + if pm != nil { + pubsub.SetPublishResult(result, pm.String(), err) + } else { + pubsub.SetPublishResult(result, "", err) + } + }) + return result +} + +// Stop sends all remaining published messages and closes publish streams. +// Returns once all outstanding messages have been sent or have failed to be +// sent. +func (p *PublisherClient) Stop() { + p.wirePub.Stop() + p.wirePub.WaitStopped() +} + +// Error returns the error that caused the publisher client to terminate. It +// may be nil if Stop() was called. +func (p *PublisherClient) Error() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.err != nil { + return p.err + } + return p.wirePub.Error() +} + +func (p *PublisherClient) setError(err error) { + p.mu.Lock() + defer p.mu.Unlock() + + // Don't clobber original error. + if p.err == nil { + p.err = err + } +} + +func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error { + if p.settings.MessageTransformer != nil { + return p.settings.MessageTransformer(from, to) + } + + keyExtractor := p.settings.KeyExtractor + if keyExtractor == nil { + keyExtractor = extractOrderingKey + } + return transformPublishedMessage(from, to, keyExtractor) +} diff --git a/pubsublite/ps/publisher_test.go b/pubsublite/ps/publisher_test.go new file mode 100644 index 00000000000..dab70fd312a --- /dev/null +++ b/pubsublite/ps/publisher_test.go @@ -0,0 +1,225 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "context" + "errors" + "testing" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite/common" + "cloud.google.com/go/pubsublite/internal/test" + "cloud.google.com/go/pubsublite/internal/wire" + "google.golang.org/api/support/bundler" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// mockWirePublisher is a mock implementation of the wire.Publisher interface. +// It uses test.RPCVerifier to install fake PublishResults for each Publish +// call. +type mockWirePublisher struct { + Verifier *test.RPCVerifier + FakeErr error + Stopped bool +} + +func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) { + resp, err := mp.Verifier.Pop(msg) + if err != nil { + onResult(nil, err) + return + } + result := resp.(*common.PublishMetadata) + onResult(result, nil) +} + +func (mp *mockWirePublisher) Start() {} +func (mp *mockWirePublisher) Stop() { mp.Stopped = true } +func (mp *mockWirePublisher) WaitStarted() error { return mp.FakeErr } +func (mp *mockWirePublisher) WaitStopped() error { return mp.FakeErr } +func (mp *mockWirePublisher) Error() error { return mp.FakeErr } + +func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient { + return &PublisherClient{ + settings: settings, + wirePub: &mockWirePublisher{Verifier: verifier}, + } +} + +func TestPublisherClientTransformMessage(t *testing.T) { + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + OrderingKey: "ordering_key", + Attributes: map[string]string{"attr": "value"}, + } + fakeResponse := &common.PublishMetadata{ + Partition: 2, + Offset: 42, + } + wantResultID := "2:42" + + for _, tc := range []struct { + desc string + // mutateSettings is passed a copy of DefaultPublishSettings to mutate. + mutateSettings func(settings *PublishSettings) + wantMsg *pb.PubSubMessage + }{ + { + desc: "default settings", + mutateSettings: func(settings *PublishSettings) {}, + wantMsg: &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("ordering_key"), + Attributes: map[string]*pb.AttributeValues{ + "attr": {Values: [][]byte{[]byte("value")}}, + }, + }, + }, + { + desc: "custom key extractor", + mutateSettings: func(settings *PublishSettings) { + settings.KeyExtractor = func(msg *pubsub.Message) []byte { + return msg.Data + } + }, + wantMsg: &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("data"), + Attributes: map[string]*pb.AttributeValues{ + "attr": {Values: [][]byte{[]byte("value")}}, + }, + }, + }, + { + desc: "custom message transformer", + mutateSettings: func(settings *PublishSettings) { + settings.KeyExtractor = func(msg *pubsub.Message) []byte { + return msg.Data + } + settings.MessageTransformer = func(from *pubsub.Message, to *pb.PubSubMessage) error { + // Swaps data and key. + to.Data = []byte(from.OrderingKey) + to.Key = from.Data + return nil + } + }, + wantMsg: &pb.PubSubMessage{ + Data: []byte("ordering_key"), + Key: []byte("data"), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + settings := DefaultPublishSettings + tc.mutateSettings(&settings) + + verifier := test.NewRPCVerifier(t) + verifier.Push(tc.wantMsg, fakeResponse, nil) + defer verifier.Flush() + + pubClient := newTestPublisherClient(verifier, settings) + result := pubClient.Publish(ctx, input) + + gotID, err := result.Get(ctx) + if err != nil { + t.Errorf("Publish() got err: %v", err) + } + if gotID != wantResultID { + t.Errorf("Publish() got id: %q, want: %q", gotID, wantResultID) + } + }) + } +} + +func TestPublisherClientTransformMessageError(t *testing.T) { + wantErr := errors.New("message could not be converted") + + settings := DefaultPublishSettings + settings.MessageTransformer = func(_ *pubsub.Message, _ *pb.PubSubMessage) error { + return wantErr + } + + // No publish calls expected. + verifier := test.NewRPCVerifier(t) + defer verifier.Flush() + + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + } + pubClient := newTestPublisherClient(verifier, settings) + result := pubClient.Publish(ctx, input) + + _, gotErr := result.Get(ctx) + if !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, wantErr) + } + if !test.ErrorEqual(pubClient.Error(), wantErr) { + t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), wantErr) + } + if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, true; got != want { + t.Errorf("Publisher.Stopped: got %v, want %v", got, want) + } +} + +func TestPublisherClientTranslatePublishResultErrors(t *testing.T) { + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + OrderingKey: "ordering_key", + } + wantMsg := &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("ordering_key"), + } + + for _, tc := range []struct { + desc string + wireErr error + wantErr error + }{ + { + desc: "oversized message", + wireErr: wire.ErrOversizedMessage, + wantErr: bundler.ErrOversizedItem, + }, + { + desc: "buffer overflow", + wireErr: wire.ErrOverflow, + wantErr: bundler.ErrOverflow, + }, + { + desc: "service stopped", + wireErr: wire.ErrServiceStopped, + wantErr: wire.ErrServiceStopped, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + verifier := test.NewRPCVerifier(t) + verifier.Push(wantMsg, nil, tc.wireErr) + defer verifier.Flush() + + pubClient := newTestPublisherClient(verifier, DefaultPublishSettings) + result := pubClient.Publish(ctx, input) + + _, gotErr := result.Get(ctx) + if !test.ErrorEqual(gotErr, tc.wantErr) { + t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr) + } + }) + } +} From 882445ee966a3fb15d8b6d6e8ca4a87b4322ef89 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 1 Dec 2020 18:07:54 -0500 Subject: [PATCH 2/7] Inline doc examples --- pubsublite/ps/example_test.go | 74 +++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 pubsublite/ps/example_test.go diff --git a/pubsublite/ps/example_test.go b/pubsublite/ps/example_test.go new file mode 100644 index 00000000000..957ffdd79f6 --- /dev/null +++ b/pubsublite/ps/example_test.go @@ -0,0 +1,74 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps_test + +import ( + "context" + "fmt" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/ps" +) + +func ExamplePublisherClient_Publish() { + ctx := context.Background() + topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"} + publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} + +func ExamplePublisherClient_Error() { + ctx := context.Background() + topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"} + publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + if err == ps.ErrPublisherStopped { + fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error()) + } + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} From 4c6ce9e090af74dd0878a7ccc703b17229bab67a Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 2 Dec 2020 04:02:10 -0500 Subject: [PATCH 3/7] Minor unit test update --- pubsublite/ps/publisher_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pubsublite/ps/publisher_test.go b/pubsublite/ps/publisher_test.go index dab70fd312a..19e467580be 100644 --- a/pubsublite/ps/publisher_test.go +++ b/pubsublite/ps/publisher_test.go @@ -32,13 +32,14 @@ import ( // call. type mockWirePublisher struct { Verifier *test.RPCVerifier - FakeErr error Stopped bool + err error } func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) { resp, err := mp.Verifier.Pop(msg) if err != nil { + mp.err = err onResult(nil, err) return } @@ -48,9 +49,9 @@ func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.Publis func (mp *mockWirePublisher) Start() {} func (mp *mockWirePublisher) Stop() { mp.Stopped = true } -func (mp *mockWirePublisher) WaitStarted() error { return mp.FakeErr } -func (mp *mockWirePublisher) WaitStopped() error { return mp.FakeErr } -func (mp *mockWirePublisher) Error() error { return mp.FakeErr } +func (mp *mockWirePublisher) WaitStarted() error { return mp.err } +func (mp *mockWirePublisher) WaitStopped() error { return mp.err } +func (mp *mockWirePublisher) Error() error { return mp.err } func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient { return &PublisherClient{ @@ -220,6 +221,12 @@ func TestPublisherClientTranslatePublishResultErrors(t *testing.T) { if !test.ErrorEqual(gotErr, tc.wantErr) { t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr) } + if !test.ErrorEqual(pubClient.Error(), tc.wireErr) { + t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), tc.wireErr) + } + if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, false; got != want { + t.Errorf("Publisher.Stopped: got %v, want %v", got, want) + } }) } } From de1b1f3aabc4a1766fd4c4a8c44eb3432762c724 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 8 Dec 2020 19:23:53 -0500 Subject: [PATCH 4/7] We decided to remove the 1 MiB per-message publish limit --- pubsublite/internal/wire/errors.go | 2 +- pubsublite/internal/wire/publish_batcher.go | 2 +- .../internal/wire/publish_batcher_test.go | 6 +++--- pubsublite/internal/wire/settings.go | 18 +++++++++++++----- pubsublite/ps/publisher.go | 4 ++-- 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go index 4073d652e88..2953dca7011 100644 --- a/pubsublite/internal/wire/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -26,7 +26,7 @@ var ( // ErrOversizedMessage indicates that the user published a message over the // allowed serialized byte size limit. It is wrapped in another error. - ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishMessageBytes (%d)", MaxPublishMessageBytes) + ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes) // ErrServiceUninitialized indicates that a service (e.g. publisher or // subscriber) cannot perform an operation because it is uninitialized. diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go index ca0cc693162..ee4dadcd0c0 100644 --- a/pubsublite/internal/wire/publish_batcher.go +++ b/pubsublite/internal/wire/publish_batcher.go @@ -110,7 +110,7 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error { msgSize := proto.Size(msg) switch { - case msgSize > MaxPublishMessageBytes: + case msgSize > MaxPublishRequestBytes: return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage) case msgSize > b.availableBufferBytes: return ErrOverflow diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go index 3e12fe2bef1..79a2ab62067 100644 --- a/pubsublite/internal/wire/publish_batcher_test.go +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver } func TestPublishBatcherAddMessage(t *testing.T) { - const initAvailableBytes = MaxPublishMessageBytes + 1 + const initAvailableBytes = MaxPublishRequestBytes settings := DefaultPublishSettings settings.BufferedByteLimit = initAvailableBytes @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) { }) t.Run("oversized message", func(t *testing.T) { - msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)} - if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { + msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)} + if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg) } }) diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index e2cc5699746..d4f97bf63fb 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -24,16 +24,18 @@ const ( // batched in a single publish request. MaxPublishRequestCount = 1000 - // MaxPublishMessageBytes is the maximum allowed serialized size of a single - // Pub/Sub message in bytes. - MaxPublishMessageBytes = 1000000 - // MaxPublishRequestBytes is the maximum allowed serialized size of a single // publish request (containing a batch of messages) in bytes. Must be lower // than the gRPC limit of 4 MiB. - MaxPublishRequestBytes = 3500000 + MaxPublishRequestBytes int = 3.5 * 1024 * 1024 ) +// FrameworkType is the user-facing API for Cloud Pub/Sub Lite. +type FrameworkType string + +// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub. +const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM" + // PublishSettings control the batching of published messages. These settings // apply per partition. type PublishSettings struct { @@ -70,6 +72,9 @@ type PublishSettings struct { // The polling interval to watch for topic partition count updates. Set to 0 // to disable polling if the number of partitions will never update. ConfigPollPeriod time.Duration + + // The user-facing API type. + Framework FrameworkType } // DefaultPublishSettings holds the default values for PublishSettings. @@ -132,6 +137,9 @@ type ReceiveSettings struct { // specified, the client will use the partition assignment service to // determine which partitions it should connect to. Partitions []int + + // The user-facing API type. + Framework FrameworkType } // DefaultReceiveSettings holds the default values for ReceiveSettings. diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go index 05827454004..93640220c51 100644 --- a/pubsublite/ps/publisher.go +++ b/pubsublite/ps/publisher.go @@ -33,7 +33,7 @@ var ( ErrOverflow = bundler.ErrOverflow // ErrOversizedMessage is set for a PublishResult when a published message - // exceeds MaxPublishMessageBytes. + // exceeds MaxPublishRequestBytes. ErrOversizedMessage = bundler.ErrOversizedItem // ErrPublisherStopped is set for a PublishResult when a message cannot be @@ -55,7 +55,7 @@ func translateError(err error) error { } // PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given -// topic. +// topic. A PublisherClient is safe to use from multiple goroutines. // // See https://cloud.google.com/pubsub/lite/docs/publishing for more information // about publishing. From 622ac80101dfe11f72e3397d4b64e2e2fd55a22a Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 10 Dec 2020 20:20:54 -0500 Subject: [PATCH 5/7] Update to pubsub v.1.9.1 and fix imports --- pubsublite/ps/publisher.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go index 93640220c51..8e943f67c54 100644 --- a/pubsublite/ps/publisher.go +++ b/pubsublite/ps/publisher.go @@ -25,6 +25,7 @@ import ( "google.golang.org/api/option" "google.golang.org/api/support/bundler" + ipubsub "cloud.google.com/go/internal/pubsub" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" ) @@ -104,10 +105,10 @@ func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pub // error ErrPublisherStopped. Error() returns the error that caused the // publisher to terminate. func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult { - result := pubsub.NewPublishResult() + result := ipubsub.NewPublishResult() msgpb := new(pb.PubSubMessage) if err := p.transformMessage(msg, msgpb); err != nil { - pubsub.SetPublishResult(result, "", err) + ipubsub.SetPublishResult(result, "", err) p.setError(err) p.Stop() return result @@ -116,9 +117,9 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub p.wirePub.Publish(msgpb, func(pm *common.PublishMetadata, err error) { err = translateError(err) if pm != nil { - pubsub.SetPublishResult(result, pm.String(), err) + ipubsub.SetPublishResult(result, pm.String(), err) } else { - pubsub.SetPublishResult(result, "", err) + ipubsub.SetPublishResult(result, "", err) } }) return result From 4eccaefbaddea13c942dd65773f5db7be885d592 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 22 Dec 2020 19:32:23 -0500 Subject: [PATCH 6/7] Fix merge --- pubsublite/ps/publisher.go | 2 +- pubsublite/ps/publisher_test.go | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go index cd9f23547e6..ead99913551 100644 --- a/pubsublite/ps/publisher.go +++ b/pubsublite/ps/publisher.go @@ -110,7 +110,7 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub if err := p.transformMessage(msg, msgpb); err != nil { ipubsub.SetPublishResult(result, "", err) p.setError(err) - p.Stop() + p.wirePub.Stop() return result } diff --git a/pubsublite/ps/publisher_test.go b/pubsublite/ps/publisher_test.go index 19e467580be..3629b636b69 100644 --- a/pubsublite/ps/publisher_test.go +++ b/pubsublite/ps/publisher_test.go @@ -19,9 +19,10 @@ import ( "testing" "cloud.google.com/go/pubsub" - "cloud.google.com/go/pubsublite/common" "cloud.google.com/go/pubsublite/internal/test" "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" "google.golang.org/api/support/bundler" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" @@ -43,7 +44,7 @@ func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.Publis onResult(nil, err) return } - result := resp.(*common.PublishMetadata) + result := resp.(*publish.Metadata) onResult(result, nil) } @@ -67,7 +68,7 @@ func TestPublisherClientTransformMessage(t *testing.T) { OrderingKey: "ordering_key", Attributes: map[string]string{"attr": "value"}, } - fakeResponse := &common.PublishMetadata{ + fakeResponse := &publish.Metadata{ Partition: 2, Offset: 42, } @@ -198,6 +199,11 @@ func TestPublisherClientTranslatePublishResultErrors(t *testing.T) { wireErr: wire.ErrOversizedMessage, wantErr: bundler.ErrOversizedItem, }, + { + desc: "oversized message wrapped", + wireErr: xerrors.Errorf("placeholder error message: %w", wire.ErrOversizedMessage), + wantErr: bundler.ErrOversizedItem, + }, { desc: "buffer overflow", wireErr: wire.ErrOverflow, From 7b9810939f794a885deca13f0bb957fdf13b7328 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 22 Dec 2020 20:26:02 -0500 Subject: [PATCH 7/7] Fix Go 1.11 test failure --- pubsublite/internal/wire/assigner_test.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/pubsublite/internal/wire/assigner_test.go b/pubsublite/internal/wire/assigner_test.go index 4c7c3f02e36..64e761f1993 100644 --- a/pubsublite/internal/wire/assigner_test.go +++ b/pubsublite/internal/wire/assigner_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "sort" + "sync" "testing" "time" @@ -62,7 +63,8 @@ func fakeGenerateUUID() (uuid.UUID, error) { // testAssigner wraps an assigner for ease of testing. type testAssigner struct { // Fake error to simulate receiver unable to handle assignment. - RetError error + recvError error + mu sync.Mutex t *testing.T asn *assigner @@ -96,12 +98,20 @@ func (ta *testAssigner) receiveAssignment(partitions partitionSet) error { sort.Ints(p) ta.partitions <- p - if ta.RetError != nil { - return ta.RetError + ta.mu.Lock() + defer ta.mu.Unlock() + if ta.recvError != nil { + return ta.recvError } return nil } +func (ta *testAssigner) SetReceiveError(err error) { + ta.mu.Lock() + defer ta.mu.Unlock() + ta.recvError = err +} + func (ta *testAssigner) NextPartitions() []int { select { case <-time.After(serviceTestWaitTimeout): @@ -186,7 +196,8 @@ func TestAssignerHandlePartitionFailure(t *testing.T) { asn := newTestAssigner(t, subscription) // Simulates the assigningSubscriber discarding assignments. - asn.RetError = errors.New("subscriber shutting down") + wantErr := errors.New("subscriber shutting down") + asn.SetReceiveError(wantErr) if gotErr := asn.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) @@ -194,7 +205,7 @@ func TestAssignerHandlePartitionFailure(t *testing.T) { if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) { t.Errorf("Partition assignments: got %v, want %v", got, want) } - if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) { - t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError) + if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } }