From 04e81496862975bf664fb7314dcd981fd96d6ff9 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 6 Nov 2020 14:34:25 -0800 Subject: [PATCH 1/5] fix(pubsub): retry deadline exceeded errors in Acknowledge --- pubsub/iterator.go | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 2675d42c4e5..f759e172b0f 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -385,12 +385,35 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error { recordStat(it.ctx, AckCount, int64(len(ids))) addAcks(ids) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - return it.subc.Acknowledge(context.Background(), &pb.AcknowledgeRequest{ - Subscription: it.subName, - AckIds: ids, - }) + bo := gax.Backoff{ + Initial: 100 * time.Millisecond, + Max: time.Second, + Multiplier: 2, + } + cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + for { + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx2, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ + Subscription: it.subName, + AckIds: ids, + }) + // Retry DeadlineExceeded errors a few times before giving up and + // allowing the message to expire and be redelivered. + switch status.Code(err) { + case codes.DeadlineExceeded: + // Use the outer context with timeout here. Deadline exceeded errors + // from gax should be transient, as unacked messages will be redelivered. + if err := gax.Sleep(cctx, bo.Pause()); err != nil { + return nil + } + default: + return err + } + } }) } From 3c5ec9022cd189ceb08899897b13355ecb1612e3 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 6 Nov 2020 15:00:16 -0800 Subject: [PATCH 2/5] add comment about gapic handling unavailable --- pubsub/iterator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index f759e172b0f..a294f248ce9 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -403,6 +403,8 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { }) // Retry DeadlineExceeded errors a few times before giving up and // allowing the message to expire and be redelivered. + // The underlying library handles other retries, currently only + // codes.Unavailable. switch status.Code(err) { case codes.DeadlineExceeded: // Use the outer context with timeout here. Deadline exceeded errors From 427c964e2d96f7f608e7dc08ea033fc77d9548b0 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 6 Nov 2020 15:06:41 -0800 Subject: [PATCH 3/5] handle context deadline exceeded errors as well --- pubsub/iterator.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index a294f248ce9..22500bc64bb 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -407,12 +407,23 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { // codes.Unavailable. switch status.Code(err) { case codes.DeadlineExceeded: - // Use the outer context with timeout here. Deadline exceeded errors - // from gax should be transient, as unacked messages will be redelivered. + // Use the outer context with timeout here. Errors from gax, including + // context deadline exceeded should be transient, as unacked messages + // will be redelivered. if err := gax.Sleep(cctx, bo.Pause()); err != nil { return nil } default: + if err == nil { + return nil + } + // This addresses an error where `context deadline exceeded` errors + // not captured by the previous case causes fatal errors. + // See https://github.com/googleapis/google-cloud-go/issues/3060 + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + // Any other error is fatal. return err } } @@ -468,7 +479,7 @@ func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration) return nil } // This addresses an error where `context deadline exceeded` errors - // not captured by the previous case do not cause fatal errors. + // not captured by the previous case causes fatal errors. // See https://github.com/googleapis/google-cloud-go/issues/3060 if strings.Contains(err.Error(), "context deadline exceeded") { recordStat(it.ctx, ModAckTimeoutCount, 1) From 73b1b43a3ab067369c279e755faed472e86a1b81 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 6 Nov 2020 15:12:54 -0800 Subject: [PATCH 4/5] add backoff to context deadline exceeded errors --- pubsub/iterator.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 22500bc64bb..72504c59120 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -395,8 +395,8 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { for { // Use context.Background() as the call's context, not it.ctx. We don't // want to cancel this RPC when the iterator is stopped. - cctx2, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() + cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ Subscription: it.subName, AckIds: ids, @@ -421,7 +421,12 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { // not captured by the previous case causes fatal errors. // See https://github.com/googleapis/google-cloud-go/issues/3060 if strings.Contains(err.Error(), "context deadline exceeded") { - return nil + // Context deadline exceeded errors here should be transient + // to prevent the iterator from shutting down. + if err := gax.Sleep(cctx, bo.Pause()); err != nil { + return nil + } + continue } // Any other error is fatal. return err From c1dcb1968e7f3e0ff16e3d6db17b4e51afd3e7fb Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 6 Nov 2020 15:17:10 -0800 Subject: [PATCH 5/5] fix wording of comments (transparent over transient) --- pubsub/iterator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 72504c59120..49a3742d3c3 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -408,7 +408,7 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { switch status.Code(err) { case codes.DeadlineExceeded: // Use the outer context with timeout here. Errors from gax, including - // context deadline exceeded should be transient, as unacked messages + // context deadline exceeded should be transparent, as unacked messages // will be redelivered. if err := gax.Sleep(cctx, bo.Pause()); err != nil { return nil @@ -421,7 +421,7 @@ func (it *messageIterator) sendAck(m map[string]bool) bool { // not captured by the previous case causes fatal errors. // See https://github.com/googleapis/google-cloud-go/issues/3060 if strings.Contains(err.Error(), "context deadline exceeded") { - // Context deadline exceeded errors here should be transient + // Context deadline exceeded errors here should be transparent // to prevent the iterator from shutting down. if err := gax.Sleep(cctx, bo.Pause()); err != nil { return nil