diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 3f165a0ac18..c3c0a9e9a92 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -31,6 +31,8 @@ type flowController struct { // small releases. // Atomic. countRemaining int64 + // Number of outstanding bytes remaining. Atomic. + bytesRemaining int64 } // newFlowController creates a new flowController that ensures no more than @@ -72,7 +74,11 @@ func (f *flowController) acquire(ctx context.Context, size int) error { return err } } - atomic.AddInt64(&f.countRemaining, 1) + outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) + recordStat(ctx, OutstandingMessages, outstandingMessages) + outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) + recordStat(ctx, OutstandingBytes, outstandingBytes) + return nil } @@ -81,7 +87,7 @@ func (f *flowController) acquire(ctx context.Context, size int) error { // // tryAcquire allows large messages to proceed by treating a size greater than // maxSize as if it were equal to maxSize. -func (f *flowController) tryAcquire(size int) bool { +func (f *flowController) tryAcquire(ctx context.Context, size int) bool { if f.semCount != nil { if !f.semCount.TryAcquire(1) { return false @@ -95,13 +101,21 @@ func (f *flowController) tryAcquire(size int) bool { return false } } - atomic.AddInt64(&f.countRemaining, 1) + outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) + recordStat(ctx, OutstandingMessages, outstandingMessages) + outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) + recordStat(ctx, OutstandingBytes, outstandingBytes) + return true } // release notes that one message of size bytes is no longer outstanding. -func (f *flowController) release(size int) { - atomic.AddInt64(&f.countRemaining, -1) +func (f *flowController) release(ctx context.Context, size int) { + outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) + recordStat(ctx, OutstandingMessages, outstandingMessages) + outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) + recordStat(ctx, OutstandingBytes, outstandingBytes) + if f.semCount != nil { f.semCount.Release(1) } diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index 71c41cbf165..765577a661d 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -41,7 +41,7 @@ func TestFlowControllerCancel(t *testing.T) { // Control: a context that is not done should always return nil. go func() { time.Sleep(5 * time.Millisecond) - fc.release(5) + fc.release(ctx, 5) }() if err := fc.acquire(context.Background(), 6); err != nil { t.Errorf("got %v, expected nil", err) @@ -79,7 +79,7 @@ func TestFlowControllerNoStarve(t *testing.T) { case first <- 1: default: } - fc.release(1) + fc.release(ctx, 1) } }() } @@ -162,7 +162,7 @@ func TestFlowControllerSaturation(t *testing.T) { if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 { return errors.New("negative size") } - fc.release(test.acquireSize) + fc.release(ctx, test.acquireSize) } return success }) @@ -177,19 +177,20 @@ func TestFlowControllerSaturation(t *testing.T) { func TestFlowControllerTryAcquire(t *testing.T) { t.Parallel() fc := newFlowController(3, 10) + ctx := context.Background() // Successfully tryAcquire 4 bytes. - if !fc.tryAcquire(4) { + if !fc.tryAcquire(ctx, 4) { t.Error("got false, wanted true") } // Fail to tryAcquire 7 bytes. - if fc.tryAcquire(7) { + if fc.tryAcquire(ctx, 7) { t.Error("got true, wanted false") } // Successfully tryAcquire 6 byte. - if !fc.tryAcquire(6) { + if !fc.tryAcquire(ctx, 6) { t.Error("got false, wanted true") } } @@ -205,12 +206,12 @@ func TestFlowControllerUnboundedCount(t *testing.T) { } // Successfully tryAcquire 4 bytes. - if !fc.tryAcquire(4) { + if !fc.tryAcquire(ctx, 4) { t.Error("got false, wanted true") } // Fail to tryAcquire 3 bytes. - if fc.tryAcquire(3) { + if fc.tryAcquire(ctx, 3) { t.Error("got true, wanted false") } } @@ -223,9 +224,9 @@ func TestFlowControllerUnboundedCount2(t *testing.T) { if err := fc.acquire(ctx, 4); err != nil { t.Errorf("got %v, wanted no error", err) } - fc.release(1) - fc.release(1) - fc.release(1) + fc.release(ctx, 1) + fc.release(ctx, 1) + fc.release(ctx, 1) wantCount := int64(-2) c := int64(fc.count()) if c != wantCount { @@ -244,12 +245,12 @@ func TestFlowControllerUnboundedBytes(t *testing.T) { } // Successfully tryAcquire 4GB bytes. - if !fc.tryAcquire(4e9) { + if !fc.tryAcquire(ctx, 4e9) { t.Error("got false, wanted true") } // Fail to tryAcquire a third message. - if fc.tryAcquire(3) { + if fc.tryAcquire(ctx, 3) { t.Error("got true, wanted false") } } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index d1dee3ca9a1..0c79b31a523 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -920,7 +920,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes old := ackh.doneFunc msgLen := len(msg.Data) ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { - defer fc.release(msgLen) + defer fc.release(ctx, msgLen) old(ackID, ack, receiveTime) } wg.Add(1) @@ -957,7 +957,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } type pullOptions struct { - maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota + maxExtension time.Duration // the maximum time to extend a message's ack deadline in total maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc maxPrefetch int32 // If true, use unary Pull instead of StreamingPull, and never pull more diff --git a/pubsub/trace.go b/pubsub/trace.go index cb331725ddb..84cab3cd3cf 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -84,6 +84,14 @@ var ( // StreamResponseCount is a measure of the number of responses received on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless) + + // OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed. + // It is EXPERIMENTAL and subject to change or removal without notice. + OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless) + + // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up. + // It is EXPERIMENTAL and subject to change or removal without notice. + OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless) ) var ( @@ -130,6 +138,14 @@ var ( // StreamResponseCountView is a cumulative sum of StreamResponseCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCountView *view.View + + // OutstandingMessagesView is the last value of OutstandingMessages + // It is EXPERIMENTAL and subject to change or removal without notice. + OutstandingMessagesView *view.View + + // OutstandingBytesView is the last value of OutstandingBytes + // It is EXPERIMENTAL and subject to change or removal without notice. + OutstandingBytesView *view.View ) func init() { @@ -144,6 +160,8 @@ func init() { StreamRetryCountView = createCountView(StreamRetryCount, keySubscription) StreamRequestCountView = createCountView(StreamRequestCount, keySubscription) StreamResponseCountView = createCountView(StreamResponseCount, keySubscription) + OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription) + OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription) DefaultPublishViews = []*view.View{ PublishedMessagesView, @@ -160,6 +178,8 @@ func init() { StreamRetryCountView, StreamRequestCountView, StreamResponseCountView, + OutstandingMessagesView, + OutstandingBytesView, } } @@ -190,6 +210,16 @@ func createDistView(m stats.Measure, keys ...tag.Key) *view.View { } } +func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View { + return &view.View{ + Name: m.Name(), + Description: m.Description(), + TagKeys: keys, + Measure: m, + Aggregation: view.LastValue(), + } +} + var logOnce sync.Once // withSubscriptionKey returns a new context modified with the subscriptionKey tag map.