From 9505384b2c771d7d0c95f7786744bdf76174c706 Mon Sep 17 00:00:00 2001 From: shollyman Date: Mon, 30 Aug 2021 08:43:24 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): more metrics instrumentation (#4690) * feat(bigquery/storage/managedwriter): more metrics instrumentation --- .../storage/managedwriter/instrumentation.go | 95 +++++++++++++++---- .../storage/managedwriter/managed_stream.go | 14 +++ 2 files changed, 88 insertions(+), 21 deletions(-) diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 30140e114c6..464b99a69ed 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -31,69 +31,122 @@ var ( // We allow users to annotate streams with a data origin for monitoring purposes. // See the WithDataOrigin writer option for providing this. keyDataOrigin = tag.MustNewKey("dataOrigin") + + // keyError tags metrics using the status code of returned errors. + keyError = tag.MustNewKey("error") ) +// DefaultOpenCensusViews retains the set of all opencensus views that this +// library has instrumented, to add view registration for exporters. +var DefaultOpenCensusViews []*view.View + const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/" var ( + // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) + + // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) + // AppendRequests is a measure of the number of append requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) - // AppendBytes is a measure of the bytes sent as append requests. + // AppendRequestBytes is a measure of the bytes sent as append requests. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes) + + // AppendRequestErrors is a measure of the number of append requests that errored on send. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless) + + // AppendRequestRows is a measure of the number of append rows sent. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes) + AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless) // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) + // AppendResponseErrors is a measure of the number of append responses received with an error attached. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) + // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) +) - // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. +var ( + + // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) + AppendClientOpenView *view.View - // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. + // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) -) + AppendClientOpenRetryView *view.View -var ( // AppendRequestsView is a cumulative sum of AppendRequests. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestsView *view.View - // AppendBytesView is a cumulative sum of AppendBytes. + // AppendRequestBytesView is a cumulative sum of AppendRequestBytes. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendBytesView *view.View + AppendRequestBytesView *view.View - // AppendResponsesView is a cumulative sum of AppendResponses. + // AppendRequestErrorsView is a cumulative sum of AppendRequestErrors. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendResponsesView *view.View + AppendRequestErrorsView *view.View - // FlushRequestsView is a cumulative sum of FlushRequests. + // AppendRequestRowsView is a cumulative sum of AppendRows. // It is EXPERIMENTAL and subject to change or removal without notice. - FlushRequestsView *view.View + AppendRequestRowsView *view.View - // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. + // AppendResponsesView is a cumulative sum of AppendResponses. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendClientOpenView *view.View + AppendResponsesView *view.View - // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. + // AppendResponseErrorsView is a cumulative sum of AppendResponseErrors. // It is EXPERIMENTAL and subject to change or removal without notice. - AppendClientOpenRetryView *view.View + AppendResponseErrorsView *view.View + + // FlushRequestsView is a cumulative sum of FlushRequests. + // It is EXPERIMENTAL and subject to change or removal without notice. + FlushRequestsView *view.View ) func init() { + AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) + AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) + AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin) - AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin) + AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin) + AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError) + AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) + AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) + AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) + FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) - AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) - AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) + + DefaultOpenCensusViews = []*view.View{ + AppendClientOpenView, + AppendClientOpenRetryView, + + AppendRequestsView, + AppendRequestBytesView, + AppendRequestErrorsView, + AppendRequestRowsView, + + AppendResponsesView, + AppendResponseErrorsView, + + FlushRequestsView, + } } func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View { diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 7d50ace47e2..c630e4fe481 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -21,7 +21,9 @@ import ( "sync" "github.com/googleapis/gax-go/v2" + "go.opencensus.io/tag" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -269,7 +271,14 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error err = (*arc).Send(req) } recordStat(ms.ctx, AppendRequests, 1) + recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) + recordStat(ms.ctx, AppendRequestRows, int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))) if err != nil { + status := grpcstatus.Convert(err) + if status != nil { + ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) + recordStat(ctx, AppendRequestErrors, 1) + } bo, shouldRetry := r.Retry(err) if shouldRetry { if err := gax.Sleep(ms.ctx, bo); err != nil { @@ -366,6 +375,11 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl recordStat(ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { + tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())) + if err != nil { + tagCtx = ctx + } + recordStat(tagCtx, AppendResponseErrors, 1) nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc) continue }