From db31457cebdcd1c6370953e0360acd227567496d Mon Sep 17 00:00:00 2001 From: tmdiep Date: Fri, 26 Mar 2021 09:50:23 +1100 Subject: [PATCH] fix(pubsublite): improve handling of backend unavailability (#3846) Defined ErrBackendUnavailable to allow users to detect and handle backend unavailability. Increased default values of some settings. Added examples for handling publish and subscribe errors. --- pubsublite/internal/wire/errors.go | 5 + pubsublite/internal/wire/rpc.go | 20 ++-- pubsublite/internal/wire/settings.go | 4 +- pubsublite/internal/wire/streams.go | 29 ++--- pubsublite/internal/wire/streams_test.go | 5 +- pubsublite/pscompat/example_test.go | 138 ++++++++++++++++++++--- pubsublite/pscompat/publisher.go | 6 + pubsublite/pscompat/settings.go | 22 ++-- 8 files changed, 182 insertions(+), 47 deletions(-) diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go index ac4bfe797b6..dfb2ca108d2 100644 --- a/pubsublite/internal/wire/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -42,6 +42,11 @@ var ( // cannot perform an operation because it has stopped or is in the process of // stopping. ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping") + + // ErrBackendUnavailable indicates that the backend service has been + // unavailable for a period of time. The timeout can be configured using + // PublishSettings.Timeout or ReceiveSettings.Timeout. + ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable") ) func wrapError(context, resource string, err error) error { diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index fdca3e8b8b1..e5c6dfb9359 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -22,7 +22,9 @@ import ( "google.golang.org/api/option" "google.golang.org/api/option/internaloption" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -50,26 +52,24 @@ func newStreamRetryer(timeout time.Duration) *streamRetryer { } } -func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) { - if time.Now().After(r.deadline) { - return 0, false - } +func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) { if isRetryableSendError(err) { return r.bo.Pause(), true } return 0, false } -func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) { - if time.Now().After(r.deadline) { - return 0, false - } +func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) { if isRetryableRecvError(err) { return r.bo.Pause(), true } return 0, false } +func (r *streamRetryer) ExceededDeadline() bool { + return time.Now().After(r.deadline) +} + func isRetryableSendCode(code codes.Code) bool { switch code { // Client-side errors that occur during grpc.ClientStream.SendMsg() have a @@ -135,6 +135,10 @@ const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" func defaultClientOptions(region string) []option.ClientOption { return []option.ClientOption{ internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint), + // Keep inactive connections alive. + option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 5 * time.Minute, + })), } } diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index 6bfe7dc3b11..3fa130a86c9 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -82,7 +82,7 @@ var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, - Timeout: 60 * time.Minute, + Timeout: 7 * 24 * time.Hour, // 1 week // By default set to a high limit that is not likely to occur, but prevents // OOM errors in clients. BufferedByteLimit: 1 << 30, // 1 GiB @@ -146,7 +146,7 @@ type ReceiveSettings struct { var DefaultReceiveSettings = ReceiveSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, - Timeout: 60 * time.Minute, + Timeout: 7 * 24 * time.Hour, // 1 week } func validateReceiveSettings(settings ReceiveSettings) error { diff --git a/pubsublite/internal/wire/streams.go b/pubsublite/internal/wire/streams.go index e97d3dc2e92..752acb921c8 100644 --- a/pubsublite/internal/wire/streams.go +++ b/pubsublite/internal/wire/streams.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "golang.org/x/xerrors" "google.golang.org/grpc" gax "github.com/googleapis/gax-go/v2" @@ -32,10 +33,10 @@ import ( type streamStatus int const ( - streamUninitialized streamStatus = 0 - streamReconnecting streamStatus = 1 - streamConnected streamStatus = 2 - streamTerminated streamStatus = 3 + streamUninitialized streamStatus = iota + streamReconnecting + streamConnected + streamTerminated ) type initialResponseRequired bool @@ -257,14 +258,6 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF r := newStreamRetryer(rs.timeout) for { backoff, shouldRetry := func() (time.Duration, bool) { - defer func() { - if err != nil && cancelFunc != nil { - cancelFunc() - cancelFunc = nil - newStream = nil - } - }() - var cctx context.Context cctx, cancelFunc = context.WithCancel(rs.ctx) // Store the cancel func to quickly cancel reconnecting if the stream is @@ -286,6 +279,7 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF } if err = rs.handler.validateInitialResponse(response); err != nil { // An unexpected initial response from the server is a permanent error. + cancelFunc() return 0, false } } @@ -294,10 +288,17 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelF return 0, false }() - if !shouldRetry { + if (shouldRetry || err != nil) && cancelFunc != nil { + // Ensure that streams aren't leaked. + cancelFunc() + cancelFunc = nil + newStream = nil + } + if !shouldRetry || rs.Status() == streamTerminated { break } - if rs.Status() == streamTerminated { + if r.ExceededDeadline() { + err = xerrors.Errorf("%v: %w", err, ErrBackendUnavailable) break } if err = gax.Sleep(rs.ctx, backoff); err != nil { diff --git a/pubsublite/internal/wire/streams_test.go b/pubsublite/internal/wire/streams_test.go index 968fd48f28c..2079f8a8882 100644 --- a/pubsublite/internal/wire/streams_test.go +++ b/pubsublite/internal/wire/streams_test.go @@ -22,6 +22,7 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/pubsublite/internal/test" + "golang.org/x/xerrors" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -308,8 +309,8 @@ func TestRetryableStreamConnectTimeout(t *testing.T) { if pub.Stream.currentStream() != nil { t.Error("Client stream should be nil") } - if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, wantErr) { - t.Errorf("Stream final err: got (%v), want (%v)", gotErr, wantErr) + if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) { + t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable) } } diff --git a/pubsublite/pscompat/example_test.go b/pubsublite/pscompat/example_test.go index 7421819c6ec..7ca12e680ad 100644 --- a/pubsublite/pscompat/example_test.go +++ b/pubsublite/pscompat/example_test.go @@ -16,10 +16,13 @@ package pscompat_test import ( "context" "fmt" + "sync" "time" "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite/pscompat" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" ) func ExamplePublisherClient_Publish() { @@ -42,8 +45,9 @@ func ExamplePublisherClient_Publish() { for _, r := range results { id, err := r.Get(ctx) if err != nil { - publishFailed = true // TODO: Handle error. + publishFailed = true + continue } fmt.Printf("Published a message with a message ID: %s\n", id) } @@ -64,10 +68,11 @@ func ExamplePublisherClient_Publish() { func ExamplePublisherClient_Publish_batchingSettings() { ctx := context.Background() const topic = "projects/my-project/locations/zone/topics/my-topic" - settings := pscompat.DefaultPublishSettings - settings.DelayThreshold = 50 * time.Millisecond - settings.CountThreshold = 200 - settings.BufferedByteLimit = 5e8 + settings := pscompat.PublishSettings{ + DelayThreshold: 50 * time.Millisecond, + CountThreshold: 200, + BufferedByteLimit: 5e8, + } publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings) if err != nil { // TODO: Handle error. @@ -85,8 +90,9 @@ func ExamplePublisherClient_Publish_batchingSettings() { for _, r := range results { id, err := r.Get(ctx) if err != nil { - publishFailed = true // TODO: Handle error. + publishFailed = true + continue } fmt.Printf("Published a message with a message ID: %s\n", id) } @@ -100,6 +106,71 @@ func ExamplePublisherClient_Publish_batchingSettings() { } } +// This example illustrates how to handle various publishing errors. Some errors +// can be automatically handled (e.g. backend unavailable and buffer overflow), +// while others are fatal errors that should be inspected. +// If the application has a low tolerance to backend unavailability, set a lower +// PublishSettings.Timeout value to detect and alert. +func ExamplePublisherClient_Publish_errorHandling() { + ctx := context.Background() + const topic = "projects/my-project/locations/zone/topics/my-topic" + settings := pscompat.PublishSettings{ + // The PublisherClient will terminate when it cannot connect to backends for + // more than 10 minutes. + Timeout: 10 * time.Minute, + // Sets a conservative publish buffer byte limit, per partition. + BufferedByteLimit: 1e8, + } + publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var toRepublish []*pubsub.Message + var mu sync.Mutex + g := new(errgroup.Group) + + for i := 0; i < 10; i++ { + msg := &pubsub.Message{ + Data: []byte(fmt.Sprintf("message-%d", i)), + } + result := publisher.Publish(ctx, msg) + + g.Go(func() error { + id, err := result.Get(ctx) + if err != nil { + // NOTE: A failed PublishResult indicates that the publisher client has + // permanently terminated. A new publisher client instance must be + // created to republish failed messages. + fmt.Printf("Publish error: %v\n", err) + // Oversized messages cannot be published. + if !xerrors.Is(err, pscompat.ErrOversizedMessage) { + mu.Lock() + toRepublish = append(toRepublish, msg) + mu.Unlock() + } + return err + } + fmt.Printf("Published a message with a message ID: %s\n", id) + return nil + }) + } + if err := g.Wait(); err != nil { + fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error()) + switch { + case xerrors.Is(publisher.Error(), pscompat.ErrBackendUnavailable): + // TODO: Create a new publisher client to republish failed messages. + case xerrors.Is(publisher.Error(), pscompat.ErrOverflow): + // TODO: Create a new publisher client to republish failed messages. + // Throttle publishing. Note that backend unavailability can also cause + // buffer overflow before the ErrBackendUnavailable error. + default: + // TODO: Inspect and handle fatal error. + } + } +} + func ExampleSubscriberClient_Receive() { ctx := context.Background() const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" @@ -117,10 +188,49 @@ func ExampleSubscriberClient_Receive() { // TODO: Handle error. } - // Call cancel from callback, or another goroutine. + // Call cancel from the receiver callback or another goroutine to stop + // receiving. cancel() } +// If the application has a low tolerance to backend unavailability, set a lower +// ReceiveSettings.Timeout value to detect and alert. +func ExampleSubscriberClient_Receive_errorHandling() { + ctx := context.Background() + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" + settings := pscompat.ReceiveSettings{ + // The SubscriberClient will terminate when it cannot connect to backends + // for more than 5 minutes. + Timeout: 5 * time.Minute, + } + subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings) + if err != nil { + // TODO: Handle error. + } + + for { + cctx, cancel := context.WithCancel(ctx) + err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) { + // TODO: Handle message. + // NOTE: May be called concurrently; synchronize access to shared memory. + m.Ack() + }) + if err != nil { + fmt.Printf("Subscriber client stopped receiving due to error: %v\n", err) + if xerrors.Is(err, pscompat.ErrBackendUnavailable) { + // TODO: Alert if necessary. Receive can be retried. + } else { + // TODO: Handle fatal error. + break + } + } + + // Call cancel from the receiver callback or another goroutine to stop + // receiving. + cancel() + } +} + // This example shows how to throttle SubscriberClient.Receive, which aims for // high throughput by default. By limiting the number of messages and/or bytes // being processed at once, you can bound your program's resource consumption. @@ -129,9 +239,10 @@ func ExampleSubscriberClient_Receive() { func ExampleSubscriberClient_Receive_maxOutstanding() { ctx := context.Background() const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" - settings := pscompat.DefaultReceiveSettings - settings.MaxOutstandingMessages = 5 - settings.MaxOutstandingBytes = 10e6 + settings := pscompat.ReceiveSettings{ + MaxOutstandingMessages: 5, + MaxOutstandingBytes: 10e6, + } subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings) if err != nil { // TODO: Handle error. @@ -158,9 +269,10 @@ func ExampleSubscriberClient_Receive_maxOutstanding() { func ExampleSubscriberClient_Receive_manualPartitionAssignment() { ctx := context.Background() const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" - settings := pscompat.DefaultReceiveSettings - // NOTE: The corresponding topic must have 2 or more partitions. - settings.Partitions = []int{0, 1} + settings := pscompat.ReceiveSettings{ + // NOTE: The corresponding topic must have 2 or more partitions. + Partitions: []int{0, 1}, + } subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings) if err != nil { // TODO: Handle error. diff --git a/pubsublite/pscompat/publisher.go b/pubsublite/pscompat/publisher.go index 8506912d3a1..6f4e9415a82 100644 --- a/pubsublite/pscompat/publisher.go +++ b/pubsublite/pscompat/publisher.go @@ -39,6 +39,12 @@ var ( // stopping. PublisherClient.Error() returns the error that caused the // publisher client to terminate (if any). Use errors.Is for comparing errors. ErrPublisherStopped = wire.ErrServiceStopped + + // ErrBackendUnavailable indicates that the backend service has been + // unavailable for a period of time. The timeout can be configured using + // PublishSettings.Timeout or ReceiveSettings.Timeout. Use errors.Is for + // comparing errors. + ErrBackendUnavailable = wire.ErrBackendUnavailable ) // PublisherClient is a Pub/Sub Lite client to publish messages to a given diff --git a/pubsublite/pscompat/settings.go b/pubsublite/pscompat/settings.go index cb05b0a276a..1e5b98e53d0 100644 --- a/pubsublite/pscompat/settings.go +++ b/pubsublite/pscompat/settings.go @@ -67,9 +67,12 @@ type PublishSettings struct { // to the server. If Timeout is 0, it will be treated as // DefaultPublishSettings.Timeout. Otherwise must be > 0. // - // The timeout is exceeded, the publisher will terminate with the last error - // that occurred while trying to reconnect. Note that if the timeout duration - // is long, ErrOverflow may occur first. + // If your application has a low tolerance to backend unavailability, set + // Timeout to a lower duration to detect and handle. When the timeout is + // exceeded, the PublisherClient will terminate with ErrBackendUnavailable and + // details of the last error that occurred while trying to reconnect to + // backends. Note that if the timeout duration is long, ErrOverflow may occur + // first. Timeout time.Duration // The maximum number of bytes that the publisher will keep in memory before @@ -106,8 +109,8 @@ var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, - Timeout: 60 * time.Minute, - BufferedByteLimit: 1e8, + Timeout: 7 * 24 * time.Hour, + BufferedByteLimit: 1e10, } func (s *PublishSettings) toWireSettings() wire.PublishSettings { @@ -183,8 +186,11 @@ type ReceiveSettings struct { // to the server. If Timeout is 0, it will be treated as // DefaultReceiveSettings.Timeout. Otherwise must be > 0. // - // The timeout is exceeded, the SubscriberClient will terminate with the last - // error that occurred while trying to reconnect. + // If your application has a low tolerance to backend unavailability, set + // Timeout to a lower duration to detect and handle. When the timeout is + // exceeded, the SubscriberClient will terminate with ErrBackendUnavailable + // and details of the last error that occurred while trying to reconnect to + // backends. Timeout time.Duration // The topic partition numbers (zero-indexed) to receive messages from. @@ -206,7 +212,7 @@ type ReceiveSettings struct { var DefaultReceiveSettings = ReceiveSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, - Timeout: 60 * time.Minute, + Timeout: 7 * 24 * time.Hour, } func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings {