Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): publisher client (#3303)
PublisherClient wraps Pub/Sub Lite's wire.Publisher and emulates the pubsub.Topic.Publish API.
  • Loading branch information
tmdiep committed Dec 23, 2020
1 parent 74923c2 commit 1648ea0
Show file tree
Hide file tree
Showing 9 changed files with 513 additions and 16 deletions.
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
google.golang.org/grpc v1.34.0
Expand Down
23 changes: 17 additions & 6 deletions pubsublite/internal/wire/assigner_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -62,7 +63,8 @@ func fakeGenerateUUID() (uuid.UUID, error) {
// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
// Fake error to simulate receiver unable to handle assignment.
RetError error
recvError error
mu sync.Mutex

t *testing.T
asn *assigner
Expand Down Expand Up @@ -96,12 +98,20 @@ func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
sort.Ints(p)
ta.partitions <- p

if ta.RetError != nil {
return ta.RetError
ta.mu.Lock()
defer ta.mu.Unlock()
if ta.recvError != nil {
return ta.recvError
}
return nil
}

func (ta *testAssigner) SetReceiveError(err error) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.recvError = err
}

func (ta *testAssigner) NextPartitions() []int {
select {
case <-time.After(serviceTestWaitTimeout):
Expand Down Expand Up @@ -186,15 +196,16 @@ func TestAssignerHandlePartitionFailure(t *testing.T) {

asn := newTestAssigner(t, subscription)
// Simulates the assigningSubscriber discarding assignments.
asn.RetError = errors.New("subscriber shutting down")
wantErr := errors.New("subscriber shutting down")
asn.SetReceiveError(wantErr)

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
t.Errorf("Partition assignments: got %v, want %v", got, want)
}
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError)
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
10 changes: 9 additions & 1 deletion pubsublite/internal/wire/errors.go
Expand Up @@ -13,13 +13,21 @@

package wire

import "errors"
import (
"errors"
"fmt"
)

// Errors exported from this package.
var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

// ErrOversizedMessage indicates that the user published a message over the
// allowed serialized byte size limit. It is wrapped in another error.
ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)

// ErrServiceUninitialized indicates that a service (e.g. publisher or
// subscriber) cannot perform an operation because it is uninitialized.
ErrServiceUninitialized = errors.New("pubsublite: service must be started")
Expand Down
5 changes: 3 additions & 2 deletions pubsublite/internal/wire/publish_batcher.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -109,8 +110,8 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat
func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error {
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes)
case msgSize > MaxPublishRequestBytes:
return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage)
case msgSize > b.availableBufferBytes:
return ErrOverflow
}
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/internal/wire/publish_batcher_test.go
Expand Up @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver
}

func TestPublishBatcherAddMessage(t *testing.T) {
const initAvailableBytes = MaxPublishMessageBytes + 1
const initAvailableBytes = MaxPublishRequestBytes
settings := DefaultPublishSettings
settings.BufferedByteLimit = initAvailableBytes

Expand Down Expand Up @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) {
})

t.Run("oversized message", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg)
}
})
Expand Down
4 changes: 0 additions & 4 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -24,10 +24,6 @@ const (
// batched in a single publish request.
MaxPublishRequestCount = 1000

// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/ps/example_test.go
@@ -0,0 +1,74 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps_test

import (
"context"
"fmt"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)

func ExamplePublisherClient_Publish() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

func ExamplePublisherClient_Error() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == ps.ErrPublisherStopped {
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}
168 changes: 168 additions & 0 deletions pubsublite/ps/publisher.go
@@ -0,0 +1,168 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps

import (
"context"
"sync"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"

ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow

// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage = bundler.ErrOversizedItem

// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped. PublisherClient.Error()
// returns the error that caused the publisher client to terminate (if any).
ErrPublisherStopped = wire.ErrServiceStopped
)

// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}

// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher

// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}

// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish
// messages to a given topic.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) {
region, err := pubsublite.ZoneToRegion(topic.Zone)
if err != nil {
return nil, err
}

// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}

// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
// error ErrPublisherStopped. Error() returns the error that caused the
// publisher to terminate.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := ipubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
ipubsub.SetPublishResult(result, "", err)
p.setError(err)
p.wirePub.Stop()
return result
}

p.wirePub.Publish(msgpb, func(pm *publish.Metadata, err error) {
err = translateError(err)
if pm != nil {
ipubsub.SetPublishResult(result, pm.String(), err)
} else {
ipubsub.SetPublishResult(result, "", err)
}
})
return result
}

// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}

// Error returns the error that caused the publisher client to terminate. It
// may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()

if p.err != nil {
return p.err
}
return p.wirePub.Error()
}

func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()

// Don't clobber original error.
if p.err == nil {
p.err = err
}
}

func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}

keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}

0 comments on commit 1648ea0

Please sign in to comment.