From 1648ea06bbb08c3452f79551a9d45147379f13e4 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Dec 2020 12:43:10 +1100 Subject: [PATCH] feat(pubsublite): publisher client (#3303) PublisherClient wraps Pub/Sub Lite's wire.Publisher and emulates the pubsub.Topic.Publish API. --- pubsublite/go.mod | 1 + pubsublite/internal/wire/assigner_test.go | 23 +- pubsublite/internal/wire/errors.go | 10 +- pubsublite/internal/wire/publish_batcher.go | 5 +- .../internal/wire/publish_batcher_test.go | 6 +- pubsublite/internal/wire/settings.go | 4 - pubsublite/ps/example_test.go | 74 ++++++ pubsublite/ps/publisher.go | 168 +++++++++++++ pubsublite/ps/publisher_test.go | 238 ++++++++++++++++++ 9 files changed, 513 insertions(+), 16 deletions(-) create mode 100644 pubsublite/ps/example_test.go create mode 100644 pubsublite/ps/publisher.go create mode 100644 pubsublite/ps/publisher_test.go diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 22309bf0107..fbb7d6c67aa 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -9,6 +9,7 @@ require ( github.com/google/go-cmp v0.5.4 github.com/google/uuid v1.1.2 github.com/googleapis/gax-go/v2 v2.0.5 + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/api v0.36.0 google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a google.golang.org/grpc v1.34.0 diff --git a/pubsublite/internal/wire/assigner_test.go b/pubsublite/internal/wire/assigner_test.go index 4c7c3f02e36..64e761f1993 100644 --- a/pubsublite/internal/wire/assigner_test.go +++ b/pubsublite/internal/wire/assigner_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "sort" + "sync" "testing" "time" @@ -62,7 +63,8 @@ func fakeGenerateUUID() (uuid.UUID, error) { // testAssigner wraps an assigner for ease of testing. type testAssigner struct { // Fake error to simulate receiver unable to handle assignment. - RetError error + recvError error + mu sync.Mutex t *testing.T asn *assigner @@ -96,12 +98,20 @@ func (ta *testAssigner) receiveAssignment(partitions partitionSet) error { sort.Ints(p) ta.partitions <- p - if ta.RetError != nil { - return ta.RetError + ta.mu.Lock() + defer ta.mu.Unlock() + if ta.recvError != nil { + return ta.recvError } return nil } +func (ta *testAssigner) SetReceiveError(err error) { + ta.mu.Lock() + defer ta.mu.Unlock() + ta.recvError = err +} + func (ta *testAssigner) NextPartitions() []int { select { case <-time.After(serviceTestWaitTimeout): @@ -186,7 +196,8 @@ func TestAssignerHandlePartitionFailure(t *testing.T) { asn := newTestAssigner(t, subscription) // Simulates the assigningSubscriber discarding assignments. - asn.RetError = errors.New("subscriber shutting down") + wantErr := errors.New("subscriber shutting down") + asn.SetReceiveError(wantErr) if gotErr := asn.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) @@ -194,7 +205,7 @@ func TestAssignerHandlePartitionFailure(t *testing.T) { if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) { t.Errorf("Partition assignments: got %v, want %v", got, want) } - if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) { - t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError) + if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } } diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go index 5dc5bc10167..2953dca7011 100644 --- a/pubsublite/internal/wire/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -13,13 +13,21 @@ package wire -import "errors" +import ( + "errors" + "fmt" +) +// Errors exported from this package. var ( // ErrOverflow indicates that the publish buffers have overflowed. See // comments for PublishSettings.BufferedByteLimit. ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed") + // ErrOversizedMessage indicates that the user published a message over the + // allowed serialized byte size limit. It is wrapped in another error. + ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes) + // ErrServiceUninitialized indicates that a service (e.g. publisher or // subscriber) cannot perform an operation because it is uninitialized. ErrServiceUninitialized = errors.New("pubsublite: service must be started") diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go index a6d150565f2..1fa8ed134fb 100644 --- a/pubsublite/internal/wire/publish_batcher.go +++ b/pubsublite/internal/wire/publish_batcher.go @@ -19,6 +19,7 @@ import ( "fmt" "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" "google.golang.org/api/support/bundler" "google.golang.org/protobuf/proto" @@ -109,8 +110,8 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error { msgSize := proto.Size(msg) switch { - case msgSize > MaxPublishMessageBytes: - return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes) + case msgSize > MaxPublishRequestBytes: + return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage) case msgSize > b.availableBufferBytes: return ErrOverflow } diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go index b36fed6fe75..9f8c6a6bc42 100644 --- a/pubsublite/internal/wire/publish_batcher_test.go +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver } func TestPublishBatcherAddMessage(t *testing.T) { - const initAvailableBytes = MaxPublishMessageBytes + 1 + const initAvailableBytes = MaxPublishRequestBytes settings := DefaultPublishSettings settings.BufferedByteLimit = initAvailableBytes @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) { }) t.Run("oversized message", func(t *testing.T) { - msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)} - if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { + msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)} + if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg) } }) diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index f1dcd6cb0ea..d4f97bf63fb 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -24,10 +24,6 @@ const ( // batched in a single publish request. MaxPublishRequestCount = 1000 - // MaxPublishMessageBytes is the maximum allowed serialized size of a single - // Pub/Sub message in bytes. - MaxPublishMessageBytes = 1000000 - // MaxPublishRequestBytes is the maximum allowed serialized size of a single // publish request (containing a batch of messages) in bytes. Must be lower // than the gRPC limit of 4 MiB. diff --git a/pubsublite/ps/example_test.go b/pubsublite/ps/example_test.go new file mode 100644 index 00000000000..957ffdd79f6 --- /dev/null +++ b/pubsublite/ps/example_test.go @@ -0,0 +1,74 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps_test + +import ( + "context" + "fmt" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/ps" +) + +func ExamplePublisherClient_Publish() { + ctx := context.Background() + topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"} + publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} + +func ExamplePublisherClient_Error() { + ctx := context.Background() + topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"} + publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + if err == ps.ErrPublisherStopped { + fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error()) + } + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go new file mode 100644 index 00000000000..ead99913551 --- /dev/null +++ b/pubsublite/ps/publisher.go @@ -0,0 +1,168 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "context" + "sync" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" + "google.golang.org/api/option" + "google.golang.org/api/support/bundler" + + ipubsub "cloud.google.com/go/internal/pubsub" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +var ( + // ErrOverflow is set for a PublishResult when publish buffers overflow. + ErrOverflow = bundler.ErrOverflow + + // ErrOversizedMessage is set for a PublishResult when a published message + // exceeds MaxPublishRequestBytes. + ErrOversizedMessage = bundler.ErrOversizedItem + + // ErrPublisherStopped is set for a PublishResult when a message cannot be + // published because the publisher client has stopped. PublisherClient.Error() + // returns the error that caused the publisher client to terminate (if any). + ErrPublisherStopped = wire.ErrServiceStopped +) + +// translateError transforms a subset of errors to what would be returned by the +// pubsub package. +func translateError(err error) error { + if xerrors.Is(err, wire.ErrOversizedMessage) { + return ErrOversizedMessage + } + if xerrors.Is(err, wire.ErrOverflow) { + return ErrOverflow + } + return err +} + +// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given +// topic. A PublisherClient is safe to use from multiple goroutines. +// +// See https://cloud.google.com/pubsub/lite/docs/publishing for more information +// about publishing. +type PublisherClient struct { + settings PublishSettings + wirePub wire.Publisher + + // Fields below must be guarded with mutex. + mu sync.Mutex + err error +} + +// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish +// messages to a given topic. +// +// See https://cloud.google.com/pubsub/lite/docs/publishing for more information +// about publishing. +func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) { + region, err := pubsublite.ZoneToRegion(topic.Zone) + if err != nil { + return nil, err + } + + // Note: ctx is not used to create the wire publisher, because if it is + // cancelled, the publisher will not be able to perform graceful shutdown + // (e.g. flush pending messages). + wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...) + if err != nil { + return nil, err + } + wirePub.Start() + if err := wirePub.WaitStarted(); err != nil { + return nil, err + } + return &PublisherClient{settings: settings, wirePub: wirePub}, nil +} + +// Publish publishes `msg` to the topic asynchronously. Messages are batched and +// sent according to the client's PublishSettings. Publish never blocks. +// +// Publish returns a non-nil PublishResult which will be ready when the +// message has been sent (or has failed to be sent) to the server. +// +// Once Stop() has been called or the publisher has failed permanently due to an +// error, future calls to Publish will immediately return a PublishResult with +// error ErrPublisherStopped. Error() returns the error that caused the +// publisher to terminate. +func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult { + result := ipubsub.NewPublishResult() + msgpb := new(pb.PubSubMessage) + if err := p.transformMessage(msg, msgpb); err != nil { + ipubsub.SetPublishResult(result, "", err) + p.setError(err) + p.wirePub.Stop() + return result + } + + p.wirePub.Publish(msgpb, func(pm *publish.Metadata, err error) { + err = translateError(err) + if pm != nil { + ipubsub.SetPublishResult(result, pm.String(), err) + } else { + ipubsub.SetPublishResult(result, "", err) + } + }) + return result +} + +// Stop sends all remaining published messages and closes publish streams. +// Returns once all outstanding messages have been sent or have failed to be +// sent. +func (p *PublisherClient) Stop() { + p.wirePub.Stop() + p.wirePub.WaitStopped() +} + +// Error returns the error that caused the publisher client to terminate. It +// may be nil if Stop() was called. +func (p *PublisherClient) Error() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.err != nil { + return p.err + } + return p.wirePub.Error() +} + +func (p *PublisherClient) setError(err error) { + p.mu.Lock() + defer p.mu.Unlock() + + // Don't clobber original error. + if p.err == nil { + p.err = err + } +} + +func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error { + if p.settings.MessageTransformer != nil { + return p.settings.MessageTransformer(from, to) + } + + keyExtractor := p.settings.KeyExtractor + if keyExtractor == nil { + keyExtractor = extractOrderingKey + } + return transformPublishedMessage(from, to, keyExtractor) +} diff --git a/pubsublite/ps/publisher_test.go b/pubsublite/ps/publisher_test.go new file mode 100644 index 00000000000..3629b636b69 --- /dev/null +++ b/pubsublite/ps/publisher_test.go @@ -0,0 +1,238 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "context" + "errors" + "testing" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite/internal/test" + "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" + "google.golang.org/api/support/bundler" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// mockWirePublisher is a mock implementation of the wire.Publisher interface. +// It uses test.RPCVerifier to install fake PublishResults for each Publish +// call. +type mockWirePublisher struct { + Verifier *test.RPCVerifier + Stopped bool + err error +} + +func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) { + resp, err := mp.Verifier.Pop(msg) + if err != nil { + mp.err = err + onResult(nil, err) + return + } + result := resp.(*publish.Metadata) + onResult(result, nil) +} + +func (mp *mockWirePublisher) Start() {} +func (mp *mockWirePublisher) Stop() { mp.Stopped = true } +func (mp *mockWirePublisher) WaitStarted() error { return mp.err } +func (mp *mockWirePublisher) WaitStopped() error { return mp.err } +func (mp *mockWirePublisher) Error() error { return mp.err } + +func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient { + return &PublisherClient{ + settings: settings, + wirePub: &mockWirePublisher{Verifier: verifier}, + } +} + +func TestPublisherClientTransformMessage(t *testing.T) { + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + OrderingKey: "ordering_key", + Attributes: map[string]string{"attr": "value"}, + } + fakeResponse := &publish.Metadata{ + Partition: 2, + Offset: 42, + } + wantResultID := "2:42" + + for _, tc := range []struct { + desc string + // mutateSettings is passed a copy of DefaultPublishSettings to mutate. + mutateSettings func(settings *PublishSettings) + wantMsg *pb.PubSubMessage + }{ + { + desc: "default settings", + mutateSettings: func(settings *PublishSettings) {}, + wantMsg: &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("ordering_key"), + Attributes: map[string]*pb.AttributeValues{ + "attr": {Values: [][]byte{[]byte("value")}}, + }, + }, + }, + { + desc: "custom key extractor", + mutateSettings: func(settings *PublishSettings) { + settings.KeyExtractor = func(msg *pubsub.Message) []byte { + return msg.Data + } + }, + wantMsg: &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("data"), + Attributes: map[string]*pb.AttributeValues{ + "attr": {Values: [][]byte{[]byte("value")}}, + }, + }, + }, + { + desc: "custom message transformer", + mutateSettings: func(settings *PublishSettings) { + settings.KeyExtractor = func(msg *pubsub.Message) []byte { + return msg.Data + } + settings.MessageTransformer = func(from *pubsub.Message, to *pb.PubSubMessage) error { + // Swaps data and key. + to.Data = []byte(from.OrderingKey) + to.Key = from.Data + return nil + } + }, + wantMsg: &pb.PubSubMessage{ + Data: []byte("ordering_key"), + Key: []byte("data"), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + settings := DefaultPublishSettings + tc.mutateSettings(&settings) + + verifier := test.NewRPCVerifier(t) + verifier.Push(tc.wantMsg, fakeResponse, nil) + defer verifier.Flush() + + pubClient := newTestPublisherClient(verifier, settings) + result := pubClient.Publish(ctx, input) + + gotID, err := result.Get(ctx) + if err != nil { + t.Errorf("Publish() got err: %v", err) + } + if gotID != wantResultID { + t.Errorf("Publish() got id: %q, want: %q", gotID, wantResultID) + } + }) + } +} + +func TestPublisherClientTransformMessageError(t *testing.T) { + wantErr := errors.New("message could not be converted") + + settings := DefaultPublishSettings + settings.MessageTransformer = func(_ *pubsub.Message, _ *pb.PubSubMessage) error { + return wantErr + } + + // No publish calls expected. + verifier := test.NewRPCVerifier(t) + defer verifier.Flush() + + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + } + pubClient := newTestPublisherClient(verifier, settings) + result := pubClient.Publish(ctx, input) + + _, gotErr := result.Get(ctx) + if !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, wantErr) + } + if !test.ErrorEqual(pubClient.Error(), wantErr) { + t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), wantErr) + } + if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, true; got != want { + t.Errorf("Publisher.Stopped: got %v, want %v", got, want) + } +} + +func TestPublisherClientTranslatePublishResultErrors(t *testing.T) { + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + OrderingKey: "ordering_key", + } + wantMsg := &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("ordering_key"), + } + + for _, tc := range []struct { + desc string + wireErr error + wantErr error + }{ + { + desc: "oversized message", + wireErr: wire.ErrOversizedMessage, + wantErr: bundler.ErrOversizedItem, + }, + { + desc: "oversized message wrapped", + wireErr: xerrors.Errorf("placeholder error message: %w", wire.ErrOversizedMessage), + wantErr: bundler.ErrOversizedItem, + }, + { + desc: "buffer overflow", + wireErr: wire.ErrOverflow, + wantErr: bundler.ErrOverflow, + }, + { + desc: "service stopped", + wireErr: wire.ErrServiceStopped, + wantErr: wire.ErrServiceStopped, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + verifier := test.NewRPCVerifier(t) + verifier.Push(wantMsg, nil, tc.wireErr) + defer verifier.Flush() + + pubClient := newTestPublisherClient(verifier, DefaultPublishSettings) + result := pubClient.Publish(ctx, input) + + _, gotErr := result.Get(ctx) + if !test.ErrorEqual(gotErr, tc.wantErr) { + t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr) + } + if !test.ErrorEqual(pubClient.Error(), tc.wireErr) { + t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), tc.wireErr) + } + if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, false; got != want { + t.Errorf("Publisher.Stopped: got %v, want %v", got, want) + } + }) + } +}