Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add opencensus metrics for outstanding messages/bytes #3690

Merged
merged 3 commits into from Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 19 additions & 5 deletions pubsub/flow_controller.go
Expand Up @@ -31,6 +31,8 @@ type flowController struct {
// small releases.
// Atomic.
countRemaining int64
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
}

// newFlowController creates a new flowController that ensures no more than
Expand Down Expand Up @@ -72,7 +74,11 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
return err
}
}
atomic.AddInt64(&f.countRemaining, 1)
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return nil
}

Expand All @@ -81,7 +87,7 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
//
// tryAcquire allows large messages to proceed by treating a size greater than
// maxSize as if it were equal to maxSize.
func (f *flowController) tryAcquire(size int) bool {
func (f *flowController) tryAcquire(ctx context.Context, size int) bool {
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return false
Expand All @@ -95,13 +101,21 @@ func (f *flowController) tryAcquire(size int) bool {
return false
}
}
atomic.AddInt64(&f.countRemaining, 1)
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return true
}

// release notes that one message of size bytes is no longer outstanding.
func (f *flowController) release(size int) {
atomic.AddInt64(&f.countRemaining, -1)
func (f *flowController) release(ctx context.Context, size int) {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

if f.semCount != nil {
f.semCount.Release(1)
}
Expand Down
27 changes: 14 additions & 13 deletions pubsub/flow_controller_test.go
Expand Up @@ -41,7 +41,7 @@ func TestFlowControllerCancel(t *testing.T) {
// Control: a context that is not done should always return nil.
go func() {
time.Sleep(5 * time.Millisecond)
fc.release(5)
fc.release(ctx, 5)
}()
if err := fc.acquire(context.Background(), 6); err != nil {
t.Errorf("got %v, expected nil", err)
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestFlowControllerNoStarve(t *testing.T) {
case first <- 1:
default:
}
fc.release(1)
fc.release(ctx, 1)
}
}()
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestFlowControllerSaturation(t *testing.T) {
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
return errors.New("negative size")
}
fc.release(test.acquireSize)
fc.release(ctx, test.acquireSize)
}
return success
})
Expand All @@ -177,19 +177,20 @@ func TestFlowControllerSaturation(t *testing.T) {
func TestFlowControllerTryAcquire(t *testing.T) {
t.Parallel()
fc := newFlowController(3, 10)
ctx := context.Background()

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(4) {
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire 7 bytes.
if fc.tryAcquire(7) {
if fc.tryAcquire(ctx, 7) {
t.Error("got true, wanted false")
}

// Successfully tryAcquire 6 byte.
if !fc.tryAcquire(6) {
if !fc.tryAcquire(ctx, 6) {
t.Error("got false, wanted true")
}
}
Expand All @@ -205,12 +206,12 @@ func TestFlowControllerUnboundedCount(t *testing.T) {
}

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(4) {
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire 3 bytes.
if fc.tryAcquire(3) {
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
}
}
Expand All @@ -223,9 +224,9 @@ func TestFlowControllerUnboundedCount2(t *testing.T) {
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
fc.release(1)
fc.release(1)
fc.release(1)
fc.release(ctx, 1)
fc.release(ctx, 1)
fc.release(ctx, 1)
wantCount := int64(-2)
c := int64(fc.count())
if c != wantCount {
Expand All @@ -244,12 +245,12 @@ func TestFlowControllerUnboundedBytes(t *testing.T) {
}

// Successfully tryAcquire 4GB bytes.
if !fc.tryAcquire(4e9) {
if !fc.tryAcquire(ctx, 4e9) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire a third message.
if fc.tryAcquire(3) {
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
}
}
4 changes: 2 additions & 2 deletions pubsub/subscription.go
Expand Up @@ -920,7 +920,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old := ackh.doneFunc
msgLen := len(msg.Data)
ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
defer fc.release(msgLen)
defer fc.release(ctx, msgLen)
old(ackID, ack, receiveTime)
}
wg.Add(1)
Expand Down Expand Up @@ -957,7 +957,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}

type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
maxPrefetch int32
// If true, use unary Pull instead of StreamingPull, and never pull more
Expand Down
30 changes: 30 additions & 0 deletions pubsub/trace.go
Expand Up @@ -84,6 +84,14 @@ var (
// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)

// OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)

// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
)

var (
Expand Down Expand Up @@ -130,6 +138,14 @@ var (
// StreamResponseCountView is a cumulative sum of StreamResponseCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCountView *view.View

// OutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessagesView *view.View

// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView *view.View
)

func init() {
Expand All @@ -144,6 +160,8 @@ func init() {
StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)

DefaultPublishViews = []*view.View{
PublishedMessagesView,
Expand All @@ -160,6 +178,8 @@ func init() {
StreamRetryCountView,
StreamRequestCountView,
StreamResponseCountView,
OutstandingMessagesView,
OutstandingBytesView,
}
}

Expand Down Expand Up @@ -190,6 +210,16 @@ func createDistView(m stats.Measure, keys ...tag.Key) *view.View {
}
}

func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.LastValue(),
}
}

var logOnce sync.Once

// withSubscriptionKey returns a new context modified with the subscriptionKey tag map.
Expand Down