Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): improve handling of backend unavailability #3846

Merged
merged 5 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pubsublite/internal/wire/errors.go
Expand Up @@ -42,6 +42,11 @@ var (
// cannot perform an operation because it has stopped or is in the process of
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")

// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout.
ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable")
)

func wrapError(context, resource string, err error) error {
Expand Down
20 changes: 12 additions & 8 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -22,7 +22,9 @@ import (

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -50,26 +52,24 @@ func newStreamRetryer(timeout time.Duration) *streamRetryer {
}
}

func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableSendError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableRecvError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) ExceededDeadline() bool {
return time.Now().After(r.deadline)
}

func isRetryableSendCode(code codes.Code) bool {
switch code {
// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
Expand Down Expand Up @@ -135,6 +135,10 @@ const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
// Keep inactive connections alive.
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
Time: 5 * time.Minute,
})),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -82,7 +82,7 @@ var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Minute,
Timeout: 7 * 24 * time.Hour, // 1 week
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
Expand Down Expand Up @@ -146,7 +146,7 @@ type ReceiveSettings struct {
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
Timeout: 60 * time.Minute,
Timeout: 7 * 24 * time.Hour, // 1 week
}

func validateReceiveSettings(settings ReceiveSettings) error {
Expand Down
29 changes: 15 additions & 14 deletions pubsublite/internal/wire/streams.go
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"golang.org/x/xerrors"
"google.golang.org/grpc"

gax "github.com/googleapis/gax-go/v2"
Expand All @@ -32,10 +33,10 @@ import (
type streamStatus int

const (
streamUninitialized streamStatus = 0
streamReconnecting streamStatus = 1
streamConnected streamStatus = 2
streamTerminated streamStatus = 3
streamUninitialized streamStatus = iota
streamReconnecting
streamConnected
streamTerminated
)

type initialResponseRequired bool
Expand Down Expand Up @@ -257,14 +258,6 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF
r := newStreamRetryer(rs.timeout)
for {
backoff, shouldRetry := func() (time.Duration, bool) {
defer func() {
if err != nil && cancelFunc != nil {
cancelFunc()
cancelFunc = nil
newStream = nil
}
}()

var cctx context.Context
cctx, cancelFunc = context.WithCancel(rs.ctx)
// Store the cancel func to quickly cancel reconnecting if the stream is
Expand All @@ -286,6 +279,7 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF
}
if err = rs.handler.validateInitialResponse(response); err != nil {
// An unexpected initial response from the server is a permanent error.
cancelFunc()
return 0, false
}
}
Expand All @@ -294,10 +288,17 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF
return 0, false
}()

if !shouldRetry {
if (shouldRetry || err != nil) && cancelFunc != nil {
// Ensure that streams aren't leaked.
cancelFunc()
cancelFunc = nil
newStream = nil
}
if !shouldRetry || rs.Status() == streamTerminated {
break
}
if rs.Status() == streamTerminated {
if r.ExceededDeadline() {
err = xerrors.Errorf("%v: %w", err, ErrBackendUnavailable)
break
}
if err = gax.Sleep(rs.ctx, backoff); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pubsublite/internal/wire/streams_test.go
Expand Up @@ -22,6 +22,7 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -308,8 +309,8 @@ func TestRetryableStreamConnectTimeout(t *testing.T) {
if pub.Stream.currentStream() != nil {
t.Error("Client stream should be nil")
}
if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, wantErr)
if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable)
}
}

Expand Down
138 changes: 125 additions & 13 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -16,10 +16,13 @@ package pscompat_test
import (
"context"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)

func ExamplePublisherClient_Publish() {
Expand All @@ -42,8 +45,9 @@ func ExamplePublisherClient_Publish() {
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
publishFailed = true
// TODO: Handle error.
publishFailed = true
continue
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand All @@ -64,10 +68,11 @@ func ExamplePublisherClient_Publish() {
func ExamplePublisherClient_Publish_batchingSettings() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
settings := pscompat.DefaultPublishSettings
settings.DelayThreshold = 50 * time.Millisecond
settings.CountThreshold = 200
settings.BufferedByteLimit = 5e8
settings := pscompat.PublishSettings{
DelayThreshold: 50 * time.Millisecond,
CountThreshold: 200,
BufferedByteLimit: 5e8,
}
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings)
if err != nil {
// TODO: Handle error.
Expand All @@ -85,8 +90,9 @@ func ExamplePublisherClient_Publish_batchingSettings() {
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
publishFailed = true
// TODO: Handle error.
publishFailed = true
continue
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand All @@ -100,6 +106,71 @@ func ExamplePublisherClient_Publish_batchingSettings() {
}
}

// This example illustrates how to handle various publishing errors. Some errors
// can be automatically handled (e.g. backend unavailable and buffer overflow),
// while others are fatal errors that should be inspected.
// If the application has a low tolerance to backend unavailability, set a lower
// PublishSettings.Timeout value to detect and alert.
func ExamplePublisherClient_Publish_errorHandling() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
settings := pscompat.PublishSettings{
// The PublisherClient will terminate when it cannot connect to backends for
// more than 10 minutes.
Timeout: 10 * time.Minute,
// Sets a conservative publish buffer byte limit, per partition.
BufferedByteLimit: 1e8,
}
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var toRepublish []*pubsub.Message
var mu sync.Mutex
g := new(errgroup.Group)

for i := 0; i < 10; i++ {
msg := &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
}
result := publisher.Publish(ctx, msg)

g.Go(func() error {
id, err := result.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client has
// permanently terminated. A new publisher client instance must be
// created to republish failed messages.
fmt.Printf("Publish error: %v\n", err)
// Oversized messages cannot be published.
if !xerrors.Is(err, pscompat.ErrOversizedMessage) {
mu.Lock()
toRepublish = append(toRepublish, msg)
mu.Unlock()
}
return err
}
fmt.Printf("Published a message with a message ID: %s\n", id)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
switch {
case xerrors.Is(publisher.Error(), pscompat.ErrBackendUnavailable):
// TODO: Create a new publisher client to republish failed messages.
case xerrors.Is(publisher.Error(), pscompat.ErrOverflow):
// TODO: Create a new publisher client to republish failed messages.
// Throttle publishing. Note that backend unavailability can also cause
// buffer overflow before the ErrBackendUnavailable error.
default:
// TODO: Inspect and handle fatal error.
}
}
}

func ExampleSubscriberClient_Receive() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
Expand All @@ -117,10 +188,49 @@ func ExampleSubscriberClient_Receive() {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

// If the application has a low tolerance to backend unavailability, set a lower
// ReceiveSettings.Timeout value to detect and alert.
func ExampleSubscriberClient_Receive_errorHandling() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.ReceiveSettings{
// The SubscriberClient will terminate when it cannot connect to backends
// for more than 5 minutes.
Timeout: 5 * time.Minute,
}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}

for {
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
fmt.Printf("Subscriber client stopped receiving due to error: %v\n", err)
if xerrors.Is(err, pscompat.ErrBackendUnavailable) {
// TODO: Alert if necessary. Receive can be retried.
} else {
// TODO: Handle fatal error.
break
}
}

// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}
}

// This example shows how to throttle SubscriberClient.Receive, which aims for
// high throughput by default. By limiting the number of messages and/or bytes
// being processed at once, you can bound your program's resource consumption.
Expand All @@ -129,9 +239,10 @@ func ExampleSubscriberClient_Receive() {
func ExampleSubscriberClient_Receive_maxOutstanding() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
settings.MaxOutstandingMessages = 5
settings.MaxOutstandingBytes = 10e6
settings := pscompat.ReceiveSettings{
MaxOutstandingMessages: 5,
MaxOutstandingBytes: 10e6,
}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
Expand All @@ -158,9 +269,10 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
// NOTE: The corresponding topic must have 2 or more partitions.
settings.Partitions = []int{0, 1}
settings := pscompat.ReceiveSettings{
// NOTE: The corresponding topic must have 2 or more partitions.
Partitions: []int{0, 1},
}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
Expand Down
6 changes: 6 additions & 0 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -39,6 +39,12 @@ var (
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any). Use errors.Is for comparing errors.
ErrPublisherStopped = wire.ErrServiceStopped

// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout. Use errors.Is for
// comparing errors.
ErrBackendUnavailable = wire.ErrBackendUnavailable
)

// PublisherClient is a Pub/Sub Lite client to publish messages to a given
Expand Down