Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): more instrumentation support (#…
Browse files Browse the repository at this point in the history
…4601)

This PR adds a bytes metric to the list of defined instrumentation
metrics, and adds an additional key to track data origin.  Ability
for users to set the data origin comes a new WithDataOrigin option
that can be passed to the managed stream constructor.

This also does some minor refactoring of how opencensus view creation
is handled.

Towards: #4366
  • Loading branch information
shollyman committed Aug 12, 2021
1 parent 04424f4 commit ff488c8
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/client.go
Expand Up @@ -126,7 +126,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
}
if ms.streamSettings != nil {
if ms.ctx != nil {
ms.ctx = keyContextWithStreamID(ms.ctx, ms.streamSettings.streamID)
ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin)
}
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
} else {
Expand Down
48 changes: 36 additions & 12 deletions bigquery/storage/managedwriter/instrumentation.go
Expand Up @@ -27,6 +27,10 @@ import (
var (
// Metrics on a stream are tagged with the stream ID.
keyStream = tag.MustNewKey("streamID")

// We allow users to annotate streams with a data origin for monitoring purposes.
// See the WithDataOrigin writer option for providing this.
keyDataOrigin = tag.MustNewKey("dataOrigin")
)

const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
Expand All @@ -36,6 +40,10 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)

// AppendBytes is a measure of the bytes sent as append requests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)

// AppendResponses is a measure of the number of append responses received.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
Expand All @@ -58,6 +66,10 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestsView *view.View

// AppendBytesView is a cumulative sum of AppendBytes.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytesView *view.View

// AppendResponsesView is a cumulative sum of AppendResponses.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponsesView *view.View
Expand All @@ -76,31 +88,43 @@ var (
)

func init() {
AppendRequestsView = createCountView(stats.Measure(AppendRequests), keyStream)
AppendResponsesView = createCountView(stats.Measure(AppendResponses), keyStream)
FlushRequestsView = createCountView(stats.Measure(FlushRequests), keyStream)
AppendClientOpenView = createCountView(stats.Measure(AppendClientOpenCount), keyStream)
AppendClientOpenRetryView = createCountView(stats.Measure(AppendClientOpenRetryCount), keyStream)
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
}

func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.Sum(),
Aggregation: agg,
}
}

var logOnce sync.Once
func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
return createView(m, view.Sum(), keys...)
}

var logTagStreamOnce sync.Once
var logTagOriginOnce sync.Once

// keyContextWithStreamID returns a new context modified with the streamID tag.
func keyContextWithStreamID(ctx context.Context, streamID string) context.Context {
// keyContextWithStreamID returns a new context modified with the instrumentation tags.
func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
if err != nil {
logOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'stream' key: %v", err)
logTagStreamOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
})
}
ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
if err != nil {
logTagOriginOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
})
}
return ctx
Expand Down
4 changes: 4 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -112,6 +112,10 @@ type streamSettings struct {
// TraceID can be set when appending data on a stream. It's
// purpose is to aid in debug and diagnostic scenarios.
TraceID string

// dataOrigin can be set for classifying metrics generated
// by a stream.
dataOrigin string
}

func defaultStreamSettings() *streamSettings {
Expand Down
8 changes: 8 additions & 0 deletions bigquery/storage/managedwriter/writer_option.go
Expand Up @@ -77,3 +77,11 @@ func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption {
ms.schemaDescriptor = dp
}
}

// WithDataOrigin is used to attach an origin context to the instrumentation metrics
// emitted by the library.
func WithDataOrigin(dataOrigin string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.dataOrigin = dataOrigin
}
}
11 changes: 11 additions & 0 deletions bigquery/storage/managedwriter/writer_option_test.go
Expand Up @@ -83,6 +83,17 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "WithDataOrigin",
options: []WriterOption{WithDataOrigin("origin")},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.dataOrigin = "origin"
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
Expand Down

0 comments on commit ff488c8

Please sign in to comment.