From 73f8166a76f9cf07b127ee53459f8095f48796a5 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Thu, 3 Dec 2020 15:02:26 -0800 Subject: [PATCH 1/2] fix(pubsub): use maxExtensionPeriod in streamAckDeadlineSeconds --- pubsub/iterator.go | 36 +++++++++++++++++------------------- pubsub/iterator_test.go | 6 ++++-- pubsub/pullstream.go | 4 ++-- pubsub/pullstream_test.go | 2 +- pubsub/subscription.go | 13 ++++++++++--- 5 files changed, 34 insertions(+), 27 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index da888ad8676..f63f60d5aaa 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -38,20 +38,19 @@ import ( const gracePeriod = 5 * time.Second type messageIterator struct { - ctx context.Context - cancel func() // the function that will cancel ctx; called in stop - po *pullOptions - ps *pullStream - subc *vkit.SubscriberClient - subName string - maxExtensionPeriod *time.Duration - kaTick <-chan time.Time // keep-alive (deadline extensions) - ackTicker *time.Ticker // message acks - nackTicker *time.Ticker // message nacks (more frequent than acks) - pingTicker *time.Ticker // sends to the stream to keep it open - failed chan struct{} // closed on stream error - drained chan struct{} // closed when stopped && no more pending messages - wg sync.WaitGroup + ctx context.Context + cancel func() // the function that will cancel ctx; called in stop + po *pullOptions + ps *pullStream + subc *vkit.SubscriberClient + subName string + kaTick <-chan time.Time // keep-alive (deadline extensions) + ackTicker *time.Ticker // message acks + nackTicker *time.Ticker // message nacks (more frequent than acks) + pingTicker *time.Ticker // sends to the stream to keep it open + failed chan struct{} // closed on stream error + drained chan struct{} // closed when stopped && no more pending messages + wg sync.WaitGroup mu sync.Mutex ackTimeDist *distribution.D // dist uses seconds @@ -73,7 +72,7 @@ type messageIterator struct { // subName is the full name of the subscription to pull messages from. // Stop must be called on the messageIterator when it is no longer needed. // The iterator always uses the background context for acking messages and extending message deadlines. -func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator { +func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator { var ps *pullStream if !po.synchronous { maxMessages := po.maxOutstandingMessages @@ -82,7 +81,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensio maxMessages = 0 maxBytes = 0 } - ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes) + ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod) } // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending // the first keepAlive halfway towards the minimum ack deadline. @@ -100,7 +99,6 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensio po: po, subc: subc, subName: subName, - maxExtensionPeriod: maxExtensionPeriod, kaTick: time.After(keepAlivePeriod), ackTicker: ackTicker, nackTicker: nackTicker, @@ -579,8 +577,8 @@ func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) { func (it *messageIterator) ackDeadline() time.Duration { pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second - if *it.maxExtensionPeriod > 0 && pt > *it.maxExtensionPeriod { - return *it.maxExtensionPeriod + if it.po.maxExtensionPeriod > 0 && pt > it.po.maxExtensionPeriod { + return it.po.maxExtensionPeriod } if pt > maxAckDeadline { return maxAckDeadline diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 68ec8c1b5c2..be4f8b4c439 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -118,8 +118,10 @@ func TestMaxExtensionPeriod(t *testing.T) { if err != nil { t.Fatal(err) } - want := time.Duration(1) * time.Second - iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &want, &pullOptions{}) + want := 1 * time.Second + iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{ + maxExtensionPeriod: want, + }) receiveTime := time.Now().Add(time.Duration(-3) * time.Second) iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second)) diff --git a/pubsub/pullstream.go b/pubsub/pullstream.go index 137dac3da9c..a2bc42b643b 100644 --- a/pubsub/pullstream.go +++ b/pubsub/pullstream.go @@ -39,7 +39,7 @@ type pullStream struct { // for testing type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error) -func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages int, maxOutstandingBytes int) *pullStream { +func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream { ctx = withSubscriptionKey(ctx, subName) return &pullStream{ ctx: ctx, @@ -50,7 +50,7 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName err = spc.Send(&pb.StreamingPullRequest{ Subscription: subName, // We modack messages when we receive them, so this value doesn't matter too much. - StreamAckDeadlineSeconds: 60, + StreamAckDeadlineSeconds: int32(maxDurationPerLeaseExtension / time.Second), MaxOutstandingMessages: int64(maxOutstandingMessages), MaxOutstandingBytes: int64(maxOutstandingBytes), }) diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go index 693e5e33ec5..a91f98e67a7 100644 --- a/pubsub/pullstream_test.go +++ b/pubsub/pullstream_test.go @@ -67,7 +67,7 @@ func TestPullStreamGet(t *testing.T) { test.errors = test.errors[1:] return &testStreamingPullClient{sendError: err}, nil } - ps := newPullStream(context.Background(), streamingPull, "", 100, 1000) + ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0) _, err := ps.get(nil) if got := status.Code(err); got != test.wantCode { t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 7e5f201145e..1b12d65c264 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -810,6 +810,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // If MaxExtension is negative, disable automatic extension. maxExt = 0 } + maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod + if maxExtPeriod < 0 { + maxExtPeriod = 0 + } + var numGoroutines int switch { case s.ReceiveSettings.Synchronous: @@ -822,6 +827,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. po := &pullOptions{ maxExtension: maxExt, + maxExtensionPeriod: maxExtPeriod, maxPrefetch: trunc32(int64(maxCount)), synchronous: s.ReceiveSettings.Synchronous, maxOutstandingMessages: maxCount, @@ -853,7 +859,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // The iterator does not use the context passed to Receive. If it did, // canceling that context would immediately stop the iterator without // waiting for unacked messages. - iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtension, po) + iter := newMessageIterator(s.c.subc, s.name, po) // We cannot use errgroup from Receive here. Receive might already be // calling group.Wait, and group.Wait cannot be called concurrently with @@ -951,8 +957,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } type pullOptions struct { - maxExtension time.Duration - maxPrefetch int32 + maxExtension time.Duration + maxExtensionPeriod time.Duration + maxPrefetch int32 // If true, use unary Pull instead of StreamingPull, and never pull more // than maxPrefetch messages. synchronous bool From e2bc348c2d95eca0a58fd5a0feeb41720c5c4bd6 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 4 Dec 2020 12:08:02 -0800 Subject: [PATCH 2/2] add inline comments for pullOptions --- pubsub/subscription.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 1b12d65c264..729e34f351f 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -957,8 +957,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } type pullOptions struct { - maxExtension time.Duration - maxExtensionPeriod time.Duration + maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota + maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc maxPrefetch int32 // If true, use unary Pull instead of StreamingPull, and never pull more // than maxPrefetch messages.