Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): respect streamAckDeadlineSeconds with MaxExtensionPeriod #3367

Merged
merged 5 commits into from Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 17 additions & 19 deletions pubsub/iterator.go
Expand Up @@ -38,20 +38,19 @@ import (
const gracePeriod = 5 * time.Second

type messageIterator struct {
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
maxExtensionPeriod *time.Duration
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks (more frequent than acks)
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks (more frequent than acks)
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup

mu sync.Mutex
ackTimeDist *distribution.D // dist uses seconds
Expand All @@ -73,7 +72,7 @@ type messageIterator struct {
// subName is the full name of the subscription to pull messages from.
// Stop must be called on the messageIterator when it is no longer needed.
// The iterator always uses the background context for acking messages and extending message deadlines.
func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator {
func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator {
var ps *pullStream
if !po.synchronous {
maxMessages := po.maxOutstandingMessages
Expand All @@ -82,7 +81,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensio
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes)
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
Expand All @@ -100,7 +99,6 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensio
po: po,
subc: subc,
subName: subName,
maxExtensionPeriod: maxExtensionPeriod,
kaTick: time.After(keepAlivePeriod),
ackTicker: ackTicker,
nackTicker: nackTicker,
Expand Down Expand Up @@ -579,8 +577,8 @@ func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) {
func (it *messageIterator) ackDeadline() time.Duration {
pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second

if *it.maxExtensionPeriod > 0 && pt > *it.maxExtensionPeriod {
return *it.maxExtensionPeriod
if it.po.maxExtensionPeriod > 0 && pt > it.po.maxExtensionPeriod {
return it.po.maxExtensionPeriod
}
if pt > maxAckDeadline {
return maxAckDeadline
Expand Down
6 changes: 4 additions & 2 deletions pubsub/iterator_test.go
Expand Up @@ -118,8 +118,10 @@ func TestMaxExtensionPeriod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want := time.Duration(1) * time.Second
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &want, &pullOptions{})
want := 1 * time.Second
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{
maxExtensionPeriod: want,
})

receiveTime := time.Now().Add(time.Duration(-3) * time.Second)
iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
Expand Down
4 changes: 2 additions & 2 deletions pubsub/pullstream.go
Expand Up @@ -39,7 +39,7 @@ type pullStream struct {
// for testing
type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)

func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages int, maxOutstandingBytes int) *pullStream {
func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
ctx = withSubscriptionKey(ctx, subName)
return &pullStream{
ctx: ctx,
Expand All @@ -50,7 +50,7 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName
err = spc.Send(&pb.StreamingPullRequest{
Subscription: subName,
// We modack messages when we receive them, so this value doesn't matter too much.
StreamAckDeadlineSeconds: 60,
StreamAckDeadlineSeconds: int32(maxDurationPerLeaseExtension / time.Second),
MaxOutstandingMessages: int64(maxOutstandingMessages),
MaxOutstandingBytes: int64(maxOutstandingBytes),
})
Expand Down
2 changes: 1 addition & 1 deletion pubsub/pullstream_test.go
Expand Up @@ -67,7 +67,7 @@ func TestPullStreamGet(t *testing.T) {
test.errors = test.errors[1:]
return &testStreamingPullClient{sendError: err}, nil
}
ps := newPullStream(context.Background(), streamingPull, "", 100, 1000)
ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0)
_, err := ps.get(nil)
if got := status.Code(err); got != test.wantCode {
t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode)
Expand Down
13 changes: 10 additions & 3 deletions pubsub/subscription.go
Expand Up @@ -810,6 +810,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// If MaxExtension is negative, disable automatic extension.
maxExt = 0
}
maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod
if maxExtPeriod < 0 {
maxExtPeriod = 0
}

var numGoroutines int
switch {
case s.ReceiveSettings.Synchronous:
Expand All @@ -822,6 +827,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
po := &pullOptions{
maxExtension: maxExt,
maxExtensionPeriod: maxExtPeriod,
maxPrefetch: trunc32(int64(maxCount)),
synchronous: s.ReceiveSettings.Synchronous,
maxOutstandingMessages: maxCount,
Expand Down Expand Up @@ -853,7 +859,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// The iterator does not use the context passed to Receive. If it did,
// canceling that context would immediately stop the iterator without
// waiting for unacked messages.
iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtension, po)
iter := newMessageIterator(s.c.subc, s.name, po)

// We cannot use errgroup from Receive here. Receive might already be
// calling group.Wait, and group.Wait cannot be called concurrently with
Expand Down Expand Up @@ -951,8 +957,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}

type pullOptions struct {
maxExtension time.Duration
maxPrefetch int32
maxExtension time.Duration
maxExtensionPeriod time.Duration
maxPrefetch int32
// If true, use unary Pull instead of StreamingPull, and never pull more
// than maxPrefetch messages.
synchronous bool
Expand Down