diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 2675d42c4e5..49a3742d3c3 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -385,12 +385,53 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error { recordStat(it.ctx, AckCount, int64(len(ids))) addAcks(ids) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - return it.subc.Acknowledge(context.Background(), &pb.AcknowledgeRequest{ - Subscription: it.subName, - AckIds: ids, - }) + bo := gax.Backoff{ + Initial: 100 * time.Millisecond, + Max: time.Second, + Multiplier: 2, + } + cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + for { + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ + Subscription: it.subName, + AckIds: ids, + }) + // Retry DeadlineExceeded errors a few times before giving up and + // allowing the message to expire and be redelivered. + // The underlying library handles other retries, currently only + // codes.Unavailable. + switch status.Code(err) { + case codes.DeadlineExceeded: + // Use the outer context with timeout here. Errors from gax, including + // context deadline exceeded should be transparent, as unacked messages + // will be redelivered. + if err := gax.Sleep(cctx, bo.Pause()); err != nil { + return nil + } + default: + if err == nil { + return nil + } + // This addresses an error where `context deadline exceeded` errors + // not captured by the previous case causes fatal errors. + // See https://github.com/googleapis/google-cloud-go/issues/3060 + if strings.Contains(err.Error(), "context deadline exceeded") { + // Context deadline exceeded errors here should be transparent + // to prevent the iterator from shutting down. + if err := gax.Sleep(cctx, bo.Pause()); err != nil { + return nil + } + continue + } + // Any other error is fatal. + return err + } + } }) } @@ -443,7 +484,7 @@ func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration) return nil } // This addresses an error where `context deadline exceeded` errors - // not captured by the previous case do not cause fatal errors. + // not captured by the previous case causes fatal errors. // See https://github.com/googleapis/google-cloud-go/issues/3060 if strings.Contains(err.Error(), "context deadline exceeded") { recordStat(it.ctx, ModAckTimeoutCount, 1)