Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): publisher client #3303

Merged
merged 13 commits into from Dec 23, 2020
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
google.golang.org/grpc v1.34.0
Expand Down
23 changes: 17 additions & 6 deletions pubsublite/internal/wire/assigner_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -62,7 +63,8 @@ func fakeGenerateUUID() (uuid.UUID, error) {
// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
// Fake error to simulate receiver unable to handle assignment.
RetError error
recvError error
mu sync.Mutex

t *testing.T
asn *assigner
Expand Down Expand Up @@ -96,12 +98,20 @@ func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
sort.Ints(p)
ta.partitions <- p

if ta.RetError != nil {
return ta.RetError
ta.mu.Lock()
defer ta.mu.Unlock()
if ta.recvError != nil {
return ta.recvError
}
return nil
}

func (ta *testAssigner) SetReceiveError(err error) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.recvError = err
}

func (ta *testAssigner) NextPartitions() []int {
select {
case <-time.After(serviceTestWaitTimeout):
Expand Down Expand Up @@ -186,15 +196,16 @@ func TestAssignerHandlePartitionFailure(t *testing.T) {

asn := newTestAssigner(t, subscription)
// Simulates the assigningSubscriber discarding assignments.
asn.RetError = errors.New("subscriber shutting down")
wantErr := errors.New("subscriber shutting down")
asn.SetReceiveError(wantErr)

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
t.Errorf("Partition assignments: got %v, want %v", got, want)
}
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError)
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
10 changes: 9 additions & 1 deletion pubsublite/internal/wire/errors.go
Expand Up @@ -13,13 +13,21 @@

package wire

import "errors"
import (
"errors"
"fmt"
)

// Errors exported from this package.
var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

// ErrOversizedMessage indicates that the user published a message over the
// allowed serialized byte size limit. It is wrapped in another error.
ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)

// ErrServiceUninitialized indicates that a service (e.g. publisher or
// subscriber) cannot perform an operation because it is uninitialized.
ErrServiceUninitialized = errors.New("pubsublite: service must be started")
Expand Down
5 changes: 3 additions & 2 deletions pubsublite/internal/wire/publish_batcher.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -109,8 +110,8 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat
func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error {
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes)
case msgSize > MaxPublishRequestBytes:
return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage)
case msgSize > b.availableBufferBytes:
return ErrOverflow
}
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/internal/wire/publish_batcher_test.go
Expand Up @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver
}

func TestPublishBatcherAddMessage(t *testing.T) {
const initAvailableBytes = MaxPublishMessageBytes + 1
const initAvailableBytes = MaxPublishRequestBytes
settings := DefaultPublishSettings
settings.BufferedByteLimit = initAvailableBytes

Expand Down Expand Up @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) {
})

t.Run("oversized message", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg)
}
})
Expand Down
4 changes: 0 additions & 4 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -24,10 +24,6 @@ const (
// batched in a single publish request.
MaxPublishRequestCount = 1000

// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/ps/example_test.go
@@ -0,0 +1,74 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps_test

import (
"context"
"fmt"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)

func ExamplePublisherClient_Publish() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

func ExamplePublisherClient_Error() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == ps.ErrPublisherStopped {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}
168 changes: 168 additions & 0 deletions pubsublite/ps/publisher.go
@@ -0,0 +1,168 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps

import (
"context"
"sync"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"

ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow

// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage = bundler.ErrOversizedItem

// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped. PublisherClient.Error()
// returns the error that caused the publisher client to terminate (if any).
ErrPublisherStopped = wire.ErrServiceStopped
)

// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}

// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher

// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}

// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish
// messages to a given topic.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) {
region, err := pubsublite.ZoneToRegion(topic.Zone)
if err != nil {
return nil, err
}

// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}

// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
// error ErrPublisherStopped. Error() returns the error that caused the
// publisher to terminate.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := ipubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
ipubsub.SetPublishResult(result, "", err)
p.setError(err)
p.wirePub.Stop()
return result
}

p.wirePub.Publish(msgpb, func(pm *publish.Metadata, err error) {
err = translateError(err)
if pm != nil {
ipubsub.SetPublishResult(result, pm.String(), err)
} else {
ipubsub.SetPublishResult(result, "", err)
}
})
return result
}

// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}

// Error returns the error that caused the publisher client to terminate. It
// may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()

if p.err != nil {
return p.err
}
return p.wirePub.Error()
}

func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()

// Don't clobber original error.
if p.err == nil {
p.err = err
}
}

func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}

keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}