Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): more metrics instrumentation (#…
Browse files Browse the repository at this point in the history
…4690)

* feat(bigquery/storage/managedwriter): more metrics instrumentation
  • Loading branch information
shollyman committed Aug 30, 2021
1 parent 1cc9ce0 commit 9505384
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 21 deletions.
95 changes: 74 additions & 21 deletions bigquery/storage/managedwriter/instrumentation.go
Expand Up @@ -31,69 +31,122 @@ var (
// We allow users to annotate streams with a data origin for monitoring purposes.
// See the WithDataOrigin writer option for providing this.
keyDataOrigin = tag.MustNewKey("dataOrigin")

// keyError tags metrics using the status code of returned errors.
keyError = tag.MustNewKey("error")
)

// DefaultOpenCensusViews retains the set of all opencensus views that this
// library has instrumented, to add view registration for exporters.
var DefaultOpenCensusViews []*view.View

const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"

var (
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)

// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)

// AppendRequests is a measure of the number of append requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)

// AppendBytes is a measure of the bytes sent as append requests.
// AppendRequestBytes is a measure of the bytes sent as append requests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)

// AppendRequestErrors is a measure of the number of append requests that errored on send.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)

// AppendRequestRows is a measure of the number of append rows sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)

// AppendResponses is a measure of the number of append responses received.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)

// AppendResponseErrors is a measure of the number of append responses received with an error attached.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)

// FlushRequests is a measure of the number of FlushRows requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
)

// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
var (

// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
AppendClientOpenView *view.View

// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
)
AppendClientOpenRetryView *view.View

var (
// AppendRequestsView is a cumulative sum of AppendRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestsView *view.View

// AppendBytesView is a cumulative sum of AppendBytes.
// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytesView *view.View
AppendRequestBytesView *view.View

// AppendResponsesView is a cumulative sum of AppendResponses.
// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponsesView *view.View
AppendRequestErrorsView *view.View

// FlushRequestsView is a cumulative sum of FlushRequests.
// AppendRequestRowsView is a cumulative sum of AppendRows.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
AppendRequestRowsView *view.View

// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
// AppendResponsesView is a cumulative sum of AppendResponses.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenView *view.View
AppendResponsesView *view.View

// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryView *view.View
AppendResponseErrorsView *view.View

// FlushRequestsView is a cumulative sum of FlushRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
)

func init() {
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)

AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)

AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)

FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)

DefaultOpenCensusViews = []*view.View{
AppendClientOpenView,
AppendClientOpenRetryView,

AppendRequestsView,
AppendRequestBytesView,
AppendRequestErrorsView,
AppendRequestRowsView,

AppendResponsesView,
AppendResponseErrorsView,

FlushRequestsView,
}
}

func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
Expand Down
14 changes: 14 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -21,7 +21,9 @@ import (
"sync"

"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -269,7 +271,14 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error
err = (*arc).Send(req)
}
recordStat(ms.ctx, AppendRequests, 1)
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
recordStat(ms.ctx, AppendRequestRows, int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())))
if err != nil {
status := grpcstatus.Convert(err)
if status != nil {
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
recordStat(ctx, AppendRequestErrors, 1)
}
bo, shouldRetry := r.Retry(err)
if shouldRetry {
if err := gax.Sleep(ms.ctx, bo); err != nil {
Expand Down Expand Up @@ -366,6 +375,11 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
recordStat(ctx, AppendResponses, 1)

if status := resp.GetError(); status != nil {
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
if err != nil {
tagCtx = ctx
}
recordStat(tagCtx, AppendResponseErrors, 1)
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
continue
}
Expand Down

0 comments on commit 9505384

Please sign in to comment.