Skip to content

Commit

Permalink
fix(pubsub): retry deadline exceeded errors in Acknowledge (#3157)
Browse files Browse the repository at this point in the history
* fix(pubsub): retry deadline exceeded errors in Acknowledge

* add comment about gapic handling unavailable

* handle context deadline exceeded errors as well

* add backoff to context deadline exceeded errors

* fix wording of comments (transparent over transient)
  • Loading branch information
hongalex committed Nov 9, 2020
1 parent c3d0c7b commit ae75b46
Showing 1 changed file with 48 additions and 7 deletions.
55 changes: 48 additions & 7 deletions pubsub/iterator.go
Expand Up @@ -385,12 +385,53 @@ func (it *messageIterator) sendAck(m map[string]bool) bool {
return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error {
recordStat(it.ctx, AckCount, int64(len(ids)))
addAcks(ids)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
return it.subc.Acknowledge(context.Background(), &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ids,
})
bo := gax.Backoff{
Initial: 100 * time.Millisecond,
Max: time.Second,
Multiplier: 2,
}
cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
for {
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ids,
})
// Retry DeadlineExceeded errors a few times before giving up and
// allowing the message to expire and be redelivered.
// The underlying library handles other retries, currently only
// codes.Unavailable.
switch status.Code(err) {
case codes.DeadlineExceeded:
// Use the outer context with timeout here. Errors from gax, including
// context deadline exceeded should be transparent, as unacked messages
// will be redelivered.
if err := gax.Sleep(cctx, bo.Pause()); err != nil {
return nil
}
default:
if err == nil {
return nil
}
// This addresses an error where `context deadline exceeded` errors
// not captured by the previous case causes fatal errors.
// See https://github.com/googleapis/google-cloud-go/issues/3060
if strings.Contains(err.Error(), "context deadline exceeded") {
// Context deadline exceeded errors here should be transparent
// to prevent the iterator from shutting down.
if err := gax.Sleep(cctx, bo.Pause()); err != nil {
return nil
}
continue
}
// Any other error is fatal.
return err
}
}
})
}

Expand Down Expand Up @@ -443,7 +484,7 @@ func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration)
return nil
}
// This addresses an error where `context deadline exceeded` errors
// not captured by the previous case do not cause fatal errors.
// not captured by the previous case causes fatal errors.
// See https://github.com/googleapis/google-cloud-go/issues/3060
if strings.Contains(err.Error(), "context deadline exceeded") {
recordStat(it.ctx, ModAckTimeoutCount, 1)
Expand Down

0 comments on commit ae75b46

Please sign in to comment.