Skip to content

Commit

Permalink
fix(pubsub): respect streamAckDeadlineSeconds with MaxExtensionPeriod (
Browse files Browse the repository at this point in the history
…#3367)

* fix(pubsub): use maxExtensionPeriod in streamAckDeadlineSeconds
  • Loading branch information
hongalex committed Dec 7, 2020
1 parent 9ac287d commit 45131b6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 27 deletions.
36 changes: 17 additions & 19 deletions pubsub/iterator.go
Expand Up @@ -38,20 +38,19 @@ import (
const gracePeriod = 5 * time.Second

type messageIterator struct {
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
maxExtensionPeriod *time.Duration
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks (more frequent than acks)
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks (more frequent than acks)
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup

mu sync.Mutex
ackTimeDist *distribution.D // dist uses seconds
Expand All @@ -73,7 +72,7 @@ type messageIterator struct {
// subName is the full name of the subscription to pull messages from.
// Stop must be called on the messageIterator when it is no longer needed.
// The iterator always uses the background context for acking messages and extending message deadlines.
func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator {
func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator {
var ps *pullStream
if !po.synchronous {
maxMessages := po.maxOutstandingMessages
Expand All @@ -82,7 +81,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensio
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes)
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
Expand All @@ -100,7 +99,6 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensio
po: po,
subc: subc,
subName: subName,
maxExtensionPeriod: maxExtensionPeriod,
kaTick: time.After(keepAlivePeriod),
ackTicker: ackTicker,
nackTicker: nackTicker,
Expand Down Expand Up @@ -579,8 +577,8 @@ func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) {
func (it *messageIterator) ackDeadline() time.Duration {
pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second

if *it.maxExtensionPeriod > 0 && pt > *it.maxExtensionPeriod {
return *it.maxExtensionPeriod
if it.po.maxExtensionPeriod > 0 && pt > it.po.maxExtensionPeriod {
return it.po.maxExtensionPeriod
}
if pt > maxAckDeadline {
return maxAckDeadline
Expand Down
6 changes: 4 additions & 2 deletions pubsub/iterator_test.go
Expand Up @@ -118,8 +118,10 @@ func TestMaxExtensionPeriod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want := time.Duration(1) * time.Second
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &want, &pullOptions{})
want := 1 * time.Second
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{
maxExtensionPeriod: want,
})

receiveTime := time.Now().Add(time.Duration(-3) * time.Second)
iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
Expand Down
4 changes: 2 additions & 2 deletions pubsub/pullstream.go
Expand Up @@ -39,7 +39,7 @@ type pullStream struct {
// for testing
type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)

func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages int, maxOutstandingBytes int) *pullStream {
func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
ctx = withSubscriptionKey(ctx, subName)
return &pullStream{
ctx: ctx,
Expand All @@ -50,7 +50,7 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName
err = spc.Send(&pb.StreamingPullRequest{
Subscription: subName,
// We modack messages when we receive them, so this value doesn't matter too much.
StreamAckDeadlineSeconds: 60,
StreamAckDeadlineSeconds: int32(maxDurationPerLeaseExtension / time.Second),
MaxOutstandingMessages: int64(maxOutstandingMessages),
MaxOutstandingBytes: int64(maxOutstandingBytes),
})
Expand Down
2 changes: 1 addition & 1 deletion pubsub/pullstream_test.go
Expand Up @@ -67,7 +67,7 @@ func TestPullStreamGet(t *testing.T) {
test.errors = test.errors[1:]
return &testStreamingPullClient{sendError: err}, nil
}
ps := newPullStream(context.Background(), streamingPull, "", 100, 1000)
ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0)
_, err := ps.get(nil)
if got := status.Code(err); got != test.wantCode {
t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode)
Expand Down
13 changes: 10 additions & 3 deletions pubsub/subscription.go
Expand Up @@ -810,6 +810,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// If MaxExtension is negative, disable automatic extension.
maxExt = 0
}
maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod
if maxExtPeriod < 0 {
maxExtPeriod = 0
}

var numGoroutines int
switch {
case s.ReceiveSettings.Synchronous:
Expand All @@ -822,6 +827,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
po := &pullOptions{
maxExtension: maxExt,
maxExtensionPeriod: maxExtPeriod,
maxPrefetch: trunc32(int64(maxCount)),
synchronous: s.ReceiveSettings.Synchronous,
maxOutstandingMessages: maxCount,
Expand Down Expand Up @@ -853,7 +859,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// The iterator does not use the context passed to Receive. If it did,
// canceling that context would immediately stop the iterator without
// waiting for unacked messages.
iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtension, po)
iter := newMessageIterator(s.c.subc, s.name, po)

// We cannot use errgroup from Receive here. Receive might already be
// calling group.Wait, and group.Wait cannot be called concurrently with
Expand Down Expand Up @@ -951,8 +957,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}

type pullOptions struct {
maxExtension time.Duration
maxPrefetch int32
maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
maxPrefetch int32
// If true, use unary Pull instead of StreamingPull, and never pull more
// than maxPrefetch messages.
synchronous bool
Expand Down

0 comments on commit 45131b6

Please sign in to comment.