Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix(pubsublite): improve handling of backend unavailability (#3846)
Defined ErrBackendUnavailable to allow users to detect and handle backend unavailability. Increased default values of some settings. Added examples for handling publish and subscribe errors.
  • Loading branch information
tmdiep committed Mar 25, 2021
1 parent bc083b6 commit db31457
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 47 deletions.
5 changes: 5 additions & 0 deletions pubsublite/internal/wire/errors.go
Expand Up @@ -42,6 +42,11 @@ var (
// cannot perform an operation because it has stopped or is in the process of
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")

// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout.
ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable")
)

func wrapError(context, resource string, err error) error {
Expand Down
20 changes: 12 additions & 8 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -22,7 +22,9 @@ import (

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -50,26 +52,24 @@ func newStreamRetryer(timeout time.Duration) *streamRetryer {
}
}

func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableSendError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableRecvError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) ExceededDeadline() bool {
return time.Now().After(r.deadline)
}

func isRetryableSendCode(code codes.Code) bool {
switch code {
// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
Expand Down Expand Up @@ -135,6 +135,10 @@ const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
// Keep inactive connections alive.
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
})),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -82,7 +82,7 @@ var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Minute,
Timeout: 7 * 24 * time.Hour, // 1 week
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
Expand Down Expand Up @@ -146,7 +146,7 @@ type ReceiveSettings struct {
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
Timeout: 60 * time.Minute,
Timeout: 7 * 24 * time.Hour, // 1 week
}

func validateReceiveSettings(settings ReceiveSettings) error {
Expand Down
29 changes: 15 additions & 14 deletions pubsublite/internal/wire/streams.go
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"golang.org/x/xerrors"
"google.golang.org/grpc"

gax "github.com/googleapis/gax-go/v2"
Expand All @@ -32,10 +33,10 @@ import (
type streamStatus int

const (
streamUninitialized streamStatus = 0
streamReconnecting streamStatus = 1
streamConnected streamStatus = 2
streamTerminated streamStatus = 3
streamUninitialized streamStatus = iota
streamReconnecting
streamConnected
streamTerminated
)

type initialResponseRequired bool
Expand Down Expand Up @@ -257,14 +258,6 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF
r := newStreamRetryer(rs.timeout)
for {
backoff, shouldRetry := func() (time.Duration, bool) {
defer func() {
if err != nil && cancelFunc != nil {
cancelFunc()
cancelFunc = nil
newStream = nil
}
}()

var cctx context.Context
cctx, cancelFunc = context.WithCancel(rs.ctx)
// Store the cancel func to quickly cancel reconnecting if the stream is
Expand All @@ -286,6 +279,7 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF
}
if err = rs.handler.validateInitialResponse(response); err != nil {
// An unexpected initial response from the server is a permanent error.
cancelFunc()
return 0, false
}
}
Expand All @@ -294,10 +288,17 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF
return 0, false
}()

if !shouldRetry {
if (shouldRetry || err != nil) && cancelFunc != nil {
// Ensure that streams aren't leaked.
cancelFunc()
cancelFunc = nil
newStream = nil
}
if !shouldRetry || rs.Status() == streamTerminated {
break
}
if rs.Status() == streamTerminated {
if r.ExceededDeadline() {
err = xerrors.Errorf("%v: %w", err, ErrBackendUnavailable)
break
}
if err = gax.Sleep(rs.ctx, backoff); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pubsublite/internal/wire/streams_test.go
Expand Up @@ -22,6 +22,7 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -308,8 +309,8 @@ func TestRetryableStreamConnectTimeout(t *testing.T) {
if pub.Stream.currentStream() != nil {
t.Error("Client stream should be nil")
}
if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, wantErr)
if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable)
}
}

Expand Down
138 changes: 125 additions & 13 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -16,10 +16,13 @@ package pscompat_test
import (
"context"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)

func ExamplePublisherClient_Publish() {
Expand All @@ -42,8 +45,9 @@ func ExamplePublisherClient_Publish() {
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
publishFailed = true
// TODO: Handle error.
publishFailed = true
continue
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand All @@ -64,10 +68,11 @@ func ExamplePublisherClient_Publish() {
func ExamplePublisherClient_Publish_batchingSettings() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
settings := pscompat.DefaultPublishSettings
settings.DelayThreshold = 50 * time.Millisecond
settings.CountThreshold = 200
settings.BufferedByteLimit = 5e8
settings := pscompat.PublishSettings{
DelayThreshold: 50 * time.Millisecond,
CountThreshold: 200,
BufferedByteLimit: 5e8,
}
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings)
if err != nil {
// TODO: Handle error.
Expand All @@ -85,8 +90,9 @@ func ExamplePublisherClient_Publish_batchingSettings() {
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
publishFailed = true
// TODO: Handle error.
publishFailed = true
continue
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand All @@ -100,6 +106,71 @@ func ExamplePublisherClient_Publish_batchingSettings() {
}
}

// This example illustrates how to handle various publishing errors. Some errors
// can be automatically handled (e.g. backend unavailable and buffer overflow),
// while others are fatal errors that should be inspected.
// If the application has a low tolerance to backend unavailability, set a lower
// PublishSettings.Timeout value to detect and alert.
func ExamplePublisherClient_Publish_errorHandling() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
settings := pscompat.PublishSettings{
// The PublisherClient will terminate when it cannot connect to backends for
// more than 10 minutes.
Timeout: 10 * time.Minute,
// Sets a conservative publish buffer byte limit, per partition.
BufferedByteLimit: 1e8,
}
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var toRepublish []*pubsub.Message
var mu sync.Mutex
g := new(errgroup.Group)

for i := 0; i < 10; i++ {
msg := &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
}
result := publisher.Publish(ctx, msg)

g.Go(func() error {
id, err := result.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client has
// permanently terminated. A new publisher client instance must be
// created to republish failed messages.
fmt.Printf("Publish error: %v\n", err)
// Oversized messages cannot be published.
if !xerrors.Is(err, pscompat.ErrOversizedMessage) {
mu.Lock()
toRepublish = append(toRepublish, msg)
mu.Unlock()
}
return err
}
fmt.Printf("Published a message with a message ID: %s\n", id)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
switch {
case xerrors.Is(publisher.Error(), pscompat.ErrBackendUnavailable):
// TODO: Create a new publisher client to republish failed messages.
case xerrors.Is(publisher.Error(), pscompat.ErrOverflow):
// TODO: Create a new publisher client to republish failed messages.
// Throttle publishing. Note that backend unavailability can also cause
// buffer overflow before the ErrBackendUnavailable error.
default:
// TODO: Inspect and handle fatal error.
}
}
}

func ExampleSubscriberClient_Receive() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
Expand All @@ -117,10 +188,49 @@ func ExampleSubscriberClient_Receive() {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

// If the application has a low tolerance to backend unavailability, set a lower
// ReceiveSettings.Timeout value to detect and alert.
func ExampleSubscriberClient_Receive_errorHandling() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.ReceiveSettings{
// The SubscriberClient will terminate when it cannot connect to backends
// for more than 5 minutes.
Timeout: 5 * time.Minute,
}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}

for {
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
fmt.Printf("Subscriber client stopped receiving due to error: %v\n", err)
if xerrors.Is(err, pscompat.ErrBackendUnavailable) {
// TODO: Alert if necessary. Receive can be retried.
} else {
// TODO: Handle fatal error.
break
}
}

// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}
}

// This example shows how to throttle SubscriberClient.Receive, which aims for
// high throughput by default. By limiting the number of messages and/or bytes
// being processed at once, you can bound your program's resource consumption.
Expand All @@ -129,9 +239,10 @@ func ExampleSubscriberClient_Receive() {
func ExampleSubscriberClient_Receive_maxOutstanding() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
settings.MaxOutstandingMessages = 5
settings.MaxOutstandingBytes = 10e6
settings := pscompat.ReceiveSettings{
MaxOutstandingMessages: 5,
MaxOutstandingBytes: 10e6,
}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
Expand All @@ -158,9 +269,10 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
// NOTE: The corresponding topic must have 2 or more partitions.
settings.Partitions = []int{0, 1}
settings := pscompat.ReceiveSettings{
// NOTE: The corresponding topic must have 2 or more partitions.
Partitions: []int{0, 1},
}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
Expand Down
6 changes: 6 additions & 0 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -39,6 +39,12 @@ var (
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any). Use errors.Is for comparing errors.
ErrPublisherStopped = wire.ErrServiceStopped

// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout. Use errors.Is for
// comparing errors.
ErrBackendUnavailable = wire.ErrBackendUnavailable
)

// PublisherClient is a Pub/Sub Lite client to publish messages to a given
Expand Down

0 comments on commit db31457

Please sign in to comment.