Skip to content

Commit

Permalink
feat(pubsub): add opencensus metrics for outstanding messages/bytes (#…
Browse files Browse the repository at this point in the history
…3690)

* feat(pubsub): add opencensus metrics for outstanding messages/bytes

* fix exported variable comment
  • Loading branch information
hongalex committed Feb 10, 2021
1 parent 00a78aa commit 4039b82
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 20 deletions.
24 changes: 19 additions & 5 deletions pubsub/flow_controller.go
Expand Up @@ -31,6 +31,8 @@ type flowController struct {
// small releases.
// Atomic.
countRemaining int64
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
}

// newFlowController creates a new flowController that ensures no more than
Expand Down Expand Up @@ -72,7 +74,11 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
return err
}
}
atomic.AddInt64(&f.countRemaining, 1)
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return nil
}

Expand All @@ -81,7 +87,7 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
//
// tryAcquire allows large messages to proceed by treating a size greater than
// maxSize as if it were equal to maxSize.
func (f *flowController) tryAcquire(size int) bool {
func (f *flowController) tryAcquire(ctx context.Context, size int) bool {
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return false
Expand All @@ -95,13 +101,21 @@ func (f *flowController) tryAcquire(size int) bool {
return false
}
}
atomic.AddInt64(&f.countRemaining, 1)
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return true
}

// release notes that one message of size bytes is no longer outstanding.
func (f *flowController) release(size int) {
atomic.AddInt64(&f.countRemaining, -1)
func (f *flowController) release(ctx context.Context, size int) {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

if f.semCount != nil {
f.semCount.Release(1)
}
Expand Down
27 changes: 14 additions & 13 deletions pubsub/flow_controller_test.go
Expand Up @@ -41,7 +41,7 @@ func TestFlowControllerCancel(t *testing.T) {
// Control: a context that is not done should always return nil.
go func() {
time.Sleep(5 * time.Millisecond)
fc.release(5)
fc.release(ctx, 5)
}()
if err := fc.acquire(context.Background(), 6); err != nil {
t.Errorf("got %v, expected nil", err)
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestFlowControllerNoStarve(t *testing.T) {
case first <- 1:
default:
}
fc.release(1)
fc.release(ctx, 1)
}
}()
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestFlowControllerSaturation(t *testing.T) {
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
return errors.New("negative size")
}
fc.release(test.acquireSize)
fc.release(ctx, test.acquireSize)
}
return success
})
Expand All @@ -177,19 +177,20 @@ func TestFlowControllerSaturation(t *testing.T) {
func TestFlowControllerTryAcquire(t *testing.T) {
t.Parallel()
fc := newFlowController(3, 10)
ctx := context.Background()

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(4) {
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire 7 bytes.
if fc.tryAcquire(7) {
if fc.tryAcquire(ctx, 7) {
t.Error("got true, wanted false")
}

// Successfully tryAcquire 6 byte.
if !fc.tryAcquire(6) {
if !fc.tryAcquire(ctx, 6) {
t.Error("got false, wanted true")
}
}
Expand All @@ -205,12 +206,12 @@ func TestFlowControllerUnboundedCount(t *testing.T) {
}

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(4) {
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire 3 bytes.
if fc.tryAcquire(3) {
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
}
}
Expand All @@ -223,9 +224,9 @@ func TestFlowControllerUnboundedCount2(t *testing.T) {
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
fc.release(1)
fc.release(1)
fc.release(1)
fc.release(ctx, 1)
fc.release(ctx, 1)
fc.release(ctx, 1)
wantCount := int64(-2)
c := int64(fc.count())
if c != wantCount {
Expand All @@ -244,12 +245,12 @@ func TestFlowControllerUnboundedBytes(t *testing.T) {
}

// Successfully tryAcquire 4GB bytes.
if !fc.tryAcquire(4e9) {
if !fc.tryAcquire(ctx, 4e9) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire a third message.
if fc.tryAcquire(3) {
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
}
}
4 changes: 2 additions & 2 deletions pubsub/subscription.go
Expand Up @@ -920,7 +920,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old := ackh.doneFunc
msgLen := len(msg.Data)
ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
defer fc.release(msgLen)
defer fc.release(ctx, msgLen)
old(ackID, ack, receiveTime)
}
wg.Add(1)
Expand Down Expand Up @@ -957,7 +957,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}

type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
maxPrefetch int32
// If true, use unary Pull instead of StreamingPull, and never pull more
Expand Down
30 changes: 30 additions & 0 deletions pubsub/trace.go
Expand Up @@ -84,6 +84,14 @@ var (
// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)

// OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)

// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
)

var (
Expand Down Expand Up @@ -130,6 +138,14 @@ var (
// StreamResponseCountView is a cumulative sum of StreamResponseCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCountView *view.View

// OutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessagesView *view.View

// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView *view.View
)

func init() {
Expand All @@ -144,6 +160,8 @@ func init() {
StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)

DefaultPublishViews = []*view.View{
PublishedMessagesView,
Expand All @@ -160,6 +178,8 @@ func init() {
StreamRetryCountView,
StreamRequestCountView,
StreamResponseCountView,
OutstandingMessagesView,
OutstandingBytesView,
}
}

Expand Down Expand Up @@ -190,6 +210,16 @@ func createDistView(m stats.Measure, keys ...tag.Key) *view.View {
}
}

func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.LastValue(),
}
}

var logOnce sync.Once

// withSubscriptionKey returns a new context modified with the subscriptionKey tag map.
Expand Down

0 comments on commit 4039b82

Please sign in to comment.