Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): fixes for background partition count updates #4293

Merged
merged 4 commits into from Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion pubsublite/internal/wire/partition_count.go
Expand Up @@ -51,7 +51,7 @@ func newPartitionCountWatcher(ctx context.Context, adminClient *vkit.AdminClient
adminClient: adminClient,
topicPath: topicPath,
receiver: receiver,
callOption: retryableReadOnlyCallOption(),
callOption: resourceExhaustedRetryer(),
}

// Polling the topic partition count can be disabled in settings if the period
Expand Down Expand Up @@ -100,6 +100,11 @@ func (p *partitionCountWatcher) updatePartitionCount() {
return p.partitionCount, nil
}
if err != nil {
if p.partitionCount > 0 {
// Ignore errors after the first update.
// TODO: Log the error.
return p.partitionCount, nil
}
err = fmt.Errorf("pubsublite: failed to update topic partition count: %v", err)
p.unsafeInitiateShutdown(err)
return 0, err
Expand Down
23 changes: 23 additions & 0 deletions pubsublite/internal/wire/partition_count_test.go
Expand Up @@ -122,3 +122,26 @@ func TestPartitionCountWatcherPartitionCountUnchanged(t *testing.T) {
watcher.VerifyCounts([]int{wantPartitionCount1, wantPartitionCount2})
watcher.StopVerifyNoError()
}

func TestPartitionCountWatcherIgnoreUpdateErrors(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
wantPartitionCount := 4

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.FailedPrecondition, ""))

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr := watcher.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
watcher.VerifyCounts([]int{wantPartitionCount}) // Initial count

// Although the next update is a permanent error, do not terminate.
watcher.UpdatePartitionCount()
watcher.VerifyCounts([]int{wantPartitionCount})
watcher.StopVerifyNoError()
}
10 changes: 6 additions & 4 deletions pubsublite/internal/wire/publisher_test.go
Expand Up @@ -1004,14 +1004,16 @@ func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) {
t.Run("Failed update", func(t *testing.T) {
pub.pub.partitionWatcher.updatePartitionCount()

// Failed background update terminates the routingPublisher.
if gotErr := pub.WaitStopped(); !test.ErrorHasMsg(gotErr, serverErr.Error()) {
t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
}
// Failed update ignored.
if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
t.Errorf("Num partition publishers: got %d, want %d", got, want)
}
})

pub.Stop()
if gotErr := pub.WaitStopped(); gotErr != nil {
t.Errorf("Stop() got err: (%v)", gotErr)
}
}

func TestNewPublisherValidatesSettings(t *testing.T) {
Expand Down
57 changes: 41 additions & 16 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -110,24 +110,49 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
return isEligible(s.Code())
}

// retryableReadOnlyCallOption returns a call option that retries with backoff
// for ResourceExhausted in addition to other default retryable codes for
// Pub/Sub. Suitable for read-only operations which are subject to only QPS
// Wraps an ordered list of retryers.
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
type compositeRetryer struct {
retryers []gax.Retryer
}

func (cr *compositeRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
for _, r := range cr.retryers {
pause, shouldRetry = r.Retry(err)
if shouldRetry {
return
}
}
return 0, false
}

// resourceExhaustedRetryer returns a call option that retries slowly with
// backoff for ResourceExhausted in addition to other default retryable codes
// for Pub/Sub. Suitable for read-only operations which are subject to only QPS
// quota limits.
func retryableReadOnlyCallOption() gax.CallOption {
func resourceExhaustedRetryer() gax.CallOption {
return gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Aborted,
codes.DeadlineExceeded,
codes.Internal,
codes.ResourceExhausted,
codes.Unavailable,
codes.Unknown,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 60 * time.Second,
Multiplier: 1.3,
})
return &compositeRetryer{
retryers: []gax.Retryer{
gax.OnCodes([]codes.Code{
codes.ResourceExhausted,
}, gax.Backoff{
Initial: time.Second,
Max: 60 * time.Second,
Multiplier: 3,
}),
gax.OnCodes([]codes.Code{
codes.Aborted,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable,
codes.Unknown,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 60 * time.Second,
Multiplier: 1.3,
}),
},
}
})
}

Expand Down