Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): retry deadline exceeded errors in Acknowledge #3157

Merged
merged 7 commits into from Nov 9, 2020
55 changes: 48 additions & 7 deletions pubsub/iterator.go
Expand Up @@ -385,12 +385,53 @@ func (it *messageIterator) sendAck(m map[string]bool) bool {
return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error {
recordStat(it.ctx, AckCount, int64(len(ids)))
addAcks(ids)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
return it.subc.Acknowledge(context.Background(), &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ids,
})
bo := gax.Backoff{
Initial: 100 * time.Millisecond,
Max: time.Second,
Multiplier: 2,
}
cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
for {
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ids,
})
// Retry DeadlineExceeded errors a few times before giving up and
// allowing the message to expire and be redelivered.
// The underlying library handles other retries, currently only
// codes.Unavailable.
switch status.Code(err) {
case codes.DeadlineExceeded:
// Use the outer context with timeout here. Errors from gax, including
// context deadline exceeded should be transparent, as unacked messages
// will be redelivered.
if err := gax.Sleep(cctx, bo.Pause()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if we got a deadline exceeded, this seems to be a sleep and return, but I'm not seeing the retry. I'd expect a continue if a retry of the Acknowledge() call on line 400 is the intent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the gax.Sleep call is to briefly pause before retrying the Acknowledge call. The err checking on this line, and the other instance of gax.Sleep, is to see if the outer 3 minute context has finished or not. If so, the entire call will terminate since err != nil and the next line will return nil. If err == nil here, Acknowledge will be called again on the next iteration of the loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was misreading the logic, thanks for the clarification.

return nil
}
default:
if err == nil {
return nil
}
// This addresses an error where `context deadline exceeded` errors
// not captured by the previous case causes fatal errors.
// See https://github.com/googleapis/google-cloud-go/issues/3060
if strings.Contains(err.Error(), "context deadline exceeded") {
// Context deadline exceeded errors here should be transparent
// to prevent the iterator from shutting down.
if err := gax.Sleep(cctx, bo.Pause()); err != nil {
return nil
}
continue
}
// Any other error is fatal.
return err
}
}
})
}

Expand Down Expand Up @@ -443,7 +484,7 @@ func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration)
return nil
}
// This addresses an error where `context deadline exceeded` errors
// not captured by the previous case do not cause fatal errors.
// not captured by the previous case causes fatal errors.
// See https://github.com/googleapis/google-cloud-go/issues/3060
if strings.Contains(err.Error(), "context deadline exceeded") {
recordStat(it.ctx, ModAckTimeoutCount, 1)
Expand Down