Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): more instrumentation support #4601

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/client.go
Expand Up @@ -126,7 +126,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
}
if ms.streamSettings != nil {
if ms.ctx != nil {
ms.ctx = keyContextWithStreamID(ms.ctx, ms.streamSettings.streamID)
ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin)
}
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
} else {
Expand Down
48 changes: 36 additions & 12 deletions bigquery/storage/managedwriter/instrumentation.go
Expand Up @@ -27,6 +27,10 @@ import (
var (
// Metrics on a stream are tagged with the stream ID.
keyStream = tag.MustNewKey("streamID")

// We allow users to annotate streams with a data origin for monitoring purposes.
// See the WithDataOrigin writer option for providing this.
keyDataOrigin = tag.MustNewKey("dataOrigin")
)

const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
Expand All @@ -36,6 +40,10 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)

// AppendBytes is a measure of the bytes sent as append requests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)

// AppendResponses is a measure of the number of append responses received.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
Expand All @@ -58,6 +66,10 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestsView *view.View

// AppendBytesView is a cumulative sum of AppendBytes.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytesView *view.View

// AppendResponsesView is a cumulative sum of AppendResponses.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponsesView *view.View
Expand All @@ -76,31 +88,43 @@ var (
)

func init() {
AppendRequestsView = createCountView(stats.Measure(AppendRequests), keyStream)
AppendResponsesView = createCountView(stats.Measure(AppendResponses), keyStream)
FlushRequestsView = createCountView(stats.Measure(FlushRequests), keyStream)
AppendClientOpenView = createCountView(stats.Measure(AppendClientOpenCount), keyStream)
AppendClientOpenRetryView = createCountView(stats.Measure(AppendClientOpenRetryCount), keyStream)
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
}

func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.Sum(),
Aggregation: agg,
}
}

var logOnce sync.Once
func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
return createView(m, view.Sum(), keys...)
}

var logTagStreamOnce sync.Once
var logTagOriginOnce sync.Once

// keyContextWithStreamID returns a new context modified with the streamID tag.
func keyContextWithStreamID(ctx context.Context, streamID string) context.Context {
// keyContextWithStreamID returns a new context modified with the instrumentation tags.
func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
if err != nil {
logOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'stream' key: %v", err)
logTagStreamOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
})
}
ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
if err != nil {
logTagOriginOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
})
}
return ctx
Expand Down
4 changes: 4 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -112,6 +112,10 @@ type streamSettings struct {
// TraceID can be set when appending data on a stream. It's
// purpose is to aid in debug and diagnostic scenarios.
TraceID string

// dataOrigin can be set for classifying metrics generated
// by a stream.
dataOrigin string
}

func defaultStreamSettings() *streamSettings {
Expand Down
8 changes: 8 additions & 0 deletions bigquery/storage/managedwriter/writer_option.go
Expand Up @@ -77,3 +77,11 @@ func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption {
ms.schemaDescriptor = dp
}
}

// WithDataOrigin is used to attach an origin context to the instrumentation metrics
// emitted by the library.
func WithDataOrigin(dataOrigin string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.dataOrigin = dataOrigin
}
}
11 changes: 11 additions & 0 deletions bigquery/storage/managedwriter/writer_option_test.go
Expand Up @@ -83,6 +83,17 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "WithDataOrigin",
options: []WriterOption{WithDataOrigin("origin")},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.dataOrigin = "origin"
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
Expand Down