From ff488c86b9c1a1f02397bb579905fa049e59ac05 Mon Sep 17 00:00:00 2001 From: shollyman Date: Thu, 12 Aug 2021 09:46:09 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): more instrumentation support (#4601) This PR adds a bytes metric to the list of defined instrumentation metrics, and adds an additional key to track data origin. Ability for users to set the data origin comes a new WithDataOrigin option that can be passed to the managed stream constructor. This also does some minor refactoring of how opencensus view creation is handled. Towards: https://github.com/googleapis/google-cloud-go/issues/4366 --- bigquery/storage/managedwriter/client.go | 2 +- .../storage/managedwriter/instrumentation.go | 48 ++++++++++++++----- .../storage/managedwriter/managed_stream.go | 4 ++ .../storage/managedwriter/writer_option.go | 8 ++++ .../managedwriter/writer_option_test.go | 11 +++++ 5 files changed, 60 insertions(+), 13 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 847aca65abe..75e27d90773 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -126,7 +126,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient } if ms.streamSettings != nil { if ms.ctx != nil { - ms.ctx = keyContextWithStreamID(ms.ctx, ms.streamSettings.streamID) + ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin) } ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes) } else { diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index ba14ace754c..30140e114c6 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -27,6 +27,10 @@ import ( var ( // Metrics on a stream are tagged with the stream ID. keyStream = tag.MustNewKey("streamID") + + // We allow users to annotate streams with a data origin for monitoring purposes. + // See the WithDataOrigin writer option for providing this. + keyDataOrigin = tag.MustNewKey("dataOrigin") ) const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/" @@ -36,6 +40,10 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) + // AppendBytes is a measure of the bytes sent as append requests. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes) + // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) @@ -58,6 +66,10 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestsView *view.View + // AppendBytesView is a cumulative sum of AppendBytes. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendBytesView *view.View + // AppendResponsesView is a cumulative sum of AppendResponses. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponsesView *view.View @@ -76,31 +88,43 @@ var ( ) func init() { - AppendRequestsView = createCountView(stats.Measure(AppendRequests), keyStream) - AppendResponsesView = createCountView(stats.Measure(AppendResponses), keyStream) - FlushRequestsView = createCountView(stats.Measure(FlushRequests), keyStream) - AppendClientOpenView = createCountView(stats.Measure(AppendClientOpenCount), keyStream) - AppendClientOpenRetryView = createCountView(stats.Measure(AppendClientOpenRetryCount), keyStream) + AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin) + AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin) + AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) + FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) + AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) + AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) } -func createCountView(m stats.Measure, keys ...tag.Key) *view.View { +func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View { return &view.View{ Name: m.Name(), Description: m.Description(), TagKeys: keys, Measure: m, - Aggregation: view.Sum(), + Aggregation: agg, } } -var logOnce sync.Once +func createSumView(m stats.Measure, keys ...tag.Key) *view.View { + return createView(m, view.Sum(), keys...) +} + +var logTagStreamOnce sync.Once +var logTagOriginOnce sync.Once -// keyContextWithStreamID returns a new context modified with the streamID tag. -func keyContextWithStreamID(ctx context.Context, streamID string) context.Context { +// keyContextWithStreamID returns a new context modified with the instrumentation tags. +func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context { ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID)) if err != nil { - logOnce.Do(func() { - log.Printf("managedwriter: error creating tag map for 'stream' key: %v", err) + logTagStreamOnce.Do(func() { + log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err) + }) + } + ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin)) + if err != nil { + logTagOriginOnce.Do(func() { + log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err) }) } return ctx diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 1064d49314c..7d50ace47e2 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -112,6 +112,10 @@ type streamSettings struct { // TraceID can be set when appending data on a stream. It's // purpose is to aid in debug and diagnostic scenarios. TraceID string + + // dataOrigin can be set for classifying metrics generated + // by a stream. + dataOrigin string } func defaultStreamSettings() *streamSettings { diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go index efd33f03c8f..9c06ddd7286 100644 --- a/bigquery/storage/managedwriter/writer_option.go +++ b/bigquery/storage/managedwriter/writer_option.go @@ -77,3 +77,11 @@ func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { ms.schemaDescriptor = dp } } + +// WithDataOrigin is used to attach an origin context to the instrumentation metrics +// emitted by the library. +func WithDataOrigin(dataOrigin string) WriterOption { + return func(ms *ManagedStream) { + ms.streamSettings.dataOrigin = dataOrigin + } +} diff --git a/bigquery/storage/managedwriter/writer_option_test.go b/bigquery/storage/managedwriter/writer_option_test.go index ed2119bf4b4..72480a3e651 100644 --- a/bigquery/storage/managedwriter/writer_option_test.go +++ b/bigquery/storage/managedwriter/writer_option_test.go @@ -83,6 +83,17 @@ func TestWriterOptions(t *testing.T) { return ms }(), }, + { + desc: "WithDataOrigin", + options: []WriterOption{WithDataOrigin("origin")}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.dataOrigin = "origin" + return ms + }(), + }, { desc: "multiple", options: []WriterOption{