From 634847b7499fb58575e3e5001dd8e6da0661fccd Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Jun 2021 09:22:07 +1000 Subject: [PATCH] fix(pubsublite): fixes for background partition count updates (#4293) - When refreshing topic partition counts in the background for Publishers, retry slowly upon resource exhausted errors. - partitionCountWatcher ignores errors after the first update. --- pubsublite/internal/wire/partition_count.go | 7 ++- .../internal/wire/partition_count_test.go | 23 ++++++++ pubsublite/internal/wire/publisher_test.go | 10 ++-- pubsublite/internal/wire/rpc.go | 57 +++++++++++++------ 4 files changed, 76 insertions(+), 21 deletions(-) diff --git a/pubsublite/internal/wire/partition_count.go b/pubsublite/internal/wire/partition_count.go index d1e724a4e3e..c4319a536f0 100644 --- a/pubsublite/internal/wire/partition_count.go +++ b/pubsublite/internal/wire/partition_count.go @@ -51,7 +51,7 @@ func newPartitionCountWatcher(ctx context.Context, adminClient *vkit.AdminClient adminClient: adminClient, topicPath: topicPath, receiver: receiver, - callOption: retryableReadOnlyCallOption(), + callOption: resourceExhaustedRetryer(), } // Polling the topic partition count can be disabled in settings if the period @@ -100,6 +100,11 @@ func (p *partitionCountWatcher) updatePartitionCount() { return p.partitionCount, nil } if err != nil { + if p.partitionCount > 0 { + // Ignore errors after the first update. + // TODO: Log the error. + return p.partitionCount, nil + } err = fmt.Errorf("pubsublite: failed to update topic partition count: %v", err) p.unsafeInitiateShutdown(err) return 0, err diff --git a/pubsublite/internal/wire/partition_count_test.go b/pubsublite/internal/wire/partition_count_test.go index 05d9558b214..aef4387e0b9 100644 --- a/pubsublite/internal/wire/partition_count_test.go +++ b/pubsublite/internal/wire/partition_count_test.go @@ -122,3 +122,26 @@ func TestPartitionCountWatcherPartitionCountUnchanged(t *testing.T) { watcher.VerifyCounts([]int{wantPartitionCount1, wantPartitionCount2}) watcher.StopVerifyNoError() } + +func TestPartitionCountWatcherIgnoreUpdateErrors(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + wantPartitionCount := 4 + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.FailedPrecondition, "")) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings()) + if gotErr := watcher.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + watcher.VerifyCounts([]int{wantPartitionCount}) // Initial count + + // Although the next update is a permanent error, do not terminate. + watcher.UpdatePartitionCount() + watcher.VerifyCounts([]int{wantPartitionCount}) + watcher.StopVerifyNoError() +} diff --git a/pubsublite/internal/wire/publisher_test.go b/pubsublite/internal/wire/publisher_test.go index 0711c50dc1c..b407c651dc4 100644 --- a/pubsublite/internal/wire/publisher_test.go +++ b/pubsublite/internal/wire/publisher_test.go @@ -1004,14 +1004,16 @@ func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) { t.Run("Failed update", func(t *testing.T) { pub.pub.partitionWatcher.updatePartitionCount() - // Failed background update terminates the routingPublisher. - if gotErr := pub.WaitStopped(); !test.ErrorHasMsg(gotErr, serverErr.Error()) { - t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) - } + // Failed update ignored. if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { t.Errorf("Num partition publishers: got %d, want %d", got, want) } }) + + pub.Stop() + if gotErr := pub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } } func TestNewPublisherValidatesSettings(t *testing.T) { diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index 0aa282057df..188f148bb9c 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -110,24 +110,49 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool { return isEligible(s.Code()) } -// retryableReadOnlyCallOption returns a call option that retries with backoff -// for ResourceExhausted in addition to other default retryable codes for -// Pub/Sub. Suitable for read-only operations which are subject to only QPS +// Wraps an ordered list of retryers. Earlier retryers take precedence. +type compositeRetryer struct { + retryers []gax.Retryer +} + +func (cr *compositeRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { + for _, r := range cr.retryers { + pause, shouldRetry = r.Retry(err) + if shouldRetry { + return + } + } + return 0, false +} + +// resourceExhaustedRetryer returns a call option that retries slowly with +// backoff for ResourceExhausted in addition to other default retryable codes +// for Pub/Sub. Suitable for read-only operations which are subject to only QPS // quota limits. -func retryableReadOnlyCallOption() gax.CallOption { +func resourceExhaustedRetryer() gax.CallOption { return gax.WithRetry(func() gax.Retryer { - return gax.OnCodes([]codes.Code{ - codes.Aborted, - codes.DeadlineExceeded, - codes.Internal, - codes.ResourceExhausted, - codes.Unavailable, - codes.Unknown, - }, gax.Backoff{ - Initial: 100 * time.Millisecond, - Max: 60 * time.Second, - Multiplier: 1.3, - }) + return &compositeRetryer{ + retryers: []gax.Retryer{ + gax.OnCodes([]codes.Code{ + codes.ResourceExhausted, + }, gax.Backoff{ + Initial: time.Second, + Max: 60 * time.Second, + Multiplier: 3, + }), + gax.OnCodes([]codes.Code{ + codes.Aborted, + codes.DeadlineExceeded, + codes.Internal, + codes.Unavailable, + codes.Unknown, + }, gax.Backoff{ + Initial: 100 * time.Millisecond, + Max: 60 * time.Second, + Multiplier: 1.3, + }), + }, + } }) }