From 73b6f5e012d0b89d36850cb986fd7e288bf1e3c5 Mon Sep 17 00:00:00 2001 From: shollyman Date: Fri, 30 Jul 2021 11:04:10 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): add opencensus instrumentation (#4512) This starts to plumb in oc instrumentation, as we're already using it in bigquery proper and other veneers. Testing instrumentation helped catch another double-close in the recv processor, so this addresses that as well. Towards https://github.com/googleapis/google-cloud-go/issues/4366 --- bigquery/go.mod | 1 + bigquery/storage/managedwriter/client.go | 4 +- .../storage/managedwriter/instrumentation.go | 111 +++++++++++++ .../storage/managedwriter/integration_test.go | 146 +++++++++++++----- .../storage/managedwriter/managed_stream.go | 6 + 5 files changed, 225 insertions(+), 43 deletions(-) create mode 100644 bigquery/storage/managedwriter/instrumentation.go diff --git a/bigquery/go.mod b/bigquery/go.mod index 5640611c1c3..0b922c177d2 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 github.com/googleapis/gax-go/v2 v2.0.5 + go.opencensus.io v0.23.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/api v0.51.0 diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index e12fe61970f..014cd7dcd29 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -74,7 +74,6 @@ func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*M } func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) { - ctx, cancel := context.WithCancel(ctx) ms := &ManagedStream{ @@ -122,6 +121,9 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient } } if ms.streamSettings != nil { + if ms.ctx != nil { + ms.ctx = keyContextWithStreamID(ms.ctx, ms.streamSettings.streamID) + } ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes) } else { ms.fc = newFlowController(0, 0) diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go new file mode 100644 index 00000000000..ba14ace754c --- /dev/null +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -0,0 +1,111 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "log" + "sync" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + // Metrics on a stream are tagged with the stream ID. + keyStream = tag.MustNewKey("streamID") +) + +const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/" + +var ( + // AppendRequests is a measure of the number of append requests sent. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) + + // AppendResponses is a measure of the number of append responses received. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) + + // FlushRequests is a measure of the number of FlushRows requests sent. + // It is EXPERIMENTAL and subject to change or removal without notice. + FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) + + // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) + + // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) +) + +var ( + // AppendRequestsView is a cumulative sum of AppendRequests. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRequestsView *view.View + + // AppendResponsesView is a cumulative sum of AppendResponses. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendResponsesView *view.View + + // FlushRequestsView is a cumulative sum of FlushRequests. + // It is EXPERIMENTAL and subject to change or removal without notice. + FlushRequestsView *view.View + + // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendClientOpenView *view.View + + // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendClientOpenRetryView *view.View +) + +func init() { + AppendRequestsView = createCountView(stats.Measure(AppendRequests), keyStream) + AppendResponsesView = createCountView(stats.Measure(AppendResponses), keyStream) + FlushRequestsView = createCountView(stats.Measure(FlushRequests), keyStream) + AppendClientOpenView = createCountView(stats.Measure(AppendClientOpenCount), keyStream) + AppendClientOpenRetryView = createCountView(stats.Measure(AppendClientOpenRetryCount), keyStream) +} + +func createCountView(m stats.Measure, keys ...tag.Key) *view.View { + return &view.View{ + Name: m.Name(), + Description: m.Description(), + TagKeys: keys, + Measure: m, + Aggregation: view.Sum(), + } +} + +var logOnce sync.Once + +// keyContextWithStreamID returns a new context modified with the streamID tag. +func keyContextWithStreamID(ctx context.Context, streamID string) context.Context { + ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID)) + if err != nil { + logOnce.Do(func() { + log.Printf("managedwriter: error creating tag map for 'stream' key: %v", err) + }) + } + return ctx +} + +func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { + stats.Record(ctx, m.M(n)) +} diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index b181f804cf1..493166a95de 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -17,6 +17,7 @@ package managedwriter import ( "context" "fmt" + "math" "testing" "time" @@ -25,6 +26,7 @@ import ( "cloud.google.com/go/bigquery/storage/managedwriter/testdata" "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" + "go.opencensus.io/stats/view" "google.golang.org/api/option" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -45,6 +47,14 @@ var testSimpleSchema = bigquery.Schema{ {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, } +var testSimpleData = []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, +} + func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { if testing.Short() { t.Skip("Integration tests skipped in short mode") @@ -162,12 +172,15 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Parallel() testPendingStream(ctx, t, mwClient, bqClient, dataset) }) + t.Run("Instrumentation", func(t *testing.T) { + // Don't run this in parallel, we only want to collect stats from this subtest. + testInstrumentation(ctx, t, mwClient, bqClient, dataset) + }) }) + } func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { - - // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) @@ -188,17 +201,9 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } validateRowCount(ctx, t, bqClient, testTable, 0, "before send") - testData := []*testdata.SimpleMessage{ - {Name: "one", Value: 1}, - {Name: "two", Value: 2}, - {Name: "three", Value: 3}, - {Name: "four", Value: 1}, - {Name: "five", Value: 2}, - } - // First, send the test rows individually. var results []*AppendResult - for k, mesg := range testData { + for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) @@ -211,12 +216,12 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } // wait for the result to indicate ready, then validate. results[0].Ready() - wantRows := int64(len(testData)) + wantRows := int64(len(testSimpleData)) validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send") // Now, send the test rows grouped into in a single append. var data [][]byte - for k, mesg := range testData { + for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) @@ -313,16 +318,8 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC validateRowCount(ctx, t, bqClient, testTable, 0, "before send") - testData := []*testdata.SimpleMessage{ - {Name: "one", Value: 1}, - {Name: "two", Value: 2}, - {Name: "three", Value: 3}, - {Name: "four", Value: 1}, - {Name: "five", Value: 2}, - } - var expectedRows int64 - for k, mesg := range testData { + for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) @@ -368,16 +365,8 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq } validateRowCount(ctx, t, bqClient, testTable, 0, "before send") - testData := []*testdata.SimpleMessage{ - {Name: "one", Value: 1}, - {Name: "two", Value: 2}, - {Name: "three", Value: 3}, - {Name: "four", Value: 1}, - {Name: "five", Value: 2}, - } - var results []*AppendResult - for k, mesg := range testData { + for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) @@ -390,7 +379,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq } // wait for the result to indicate ready, then validate. results[0].Ready() - wantRows := int64(len(testData)) + wantRows := int64(len(testSimpleData)) validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") } @@ -413,17 +402,9 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } validateRowCount(ctx, t, bqClient, testTable, 0, "before send") - testData := []*testdata.SimpleMessage{ - {Name: "one", Value: 1}, - {Name: "two", Value: 2}, - {Name: "three", Value: 3}, - {Name: "four", Value: 1}, - {Name: "five", Value: 2}, - } - // Send data. var results []*AppendResult - for k, mesg := range testData { + for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) @@ -435,7 +416,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } } results[0].Ready() - wantRows := int64(len(testData)) + wantRows := int64(len(testSimpleData)) // Mark stream complete. trackedOffset, err := ms.Finalize(ctx) @@ -457,3 +438,84 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") } + +func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testedViews := []*view.View{ + AppendRequestsView, + AppendResponsesView, + AppendClientOpenView, + } + + if err := view.Register(testedViews...); err != nil { + t.Fatalf("couldn't register views: %v", err) + } + + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { + t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + var results []*AppendResult + for k, mesg := range testSimpleData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err = ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + // wait for the result to indicate ready. + results[0].Ready() + // Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance + // to report. + time.Sleep(time.Second) + + for _, tv := range testedViews { + metricData, err := view.RetrieveData(tv.Name) + if err != nil { + t.Errorf("view %q RetrieveData: %v", tv.Name, err) + } + if len(metricData) > 1 { + t.Errorf("%q: only expected 1 row, got %d", tv.Name, len(metricData)) + } + if len(metricData[0].Tags) != 1 { + t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags)) + } + entry := metricData[0].Data + sum, ok := entry.(*view.SumData) + if !ok { + t.Errorf("unexpected metric type: %T", entry) + } + got := sum.Value + var want int64 + switch tv { + case AppendRequestsView: + want = int64(len(testSimpleData)) + case AppendResponsesView: + want = int64(len(testSimpleData)) + case AppendClientOpenView: + want = 1 + } + + // float comparison; diff more than error bound is error + if math.Abs(got-float64(want)) > 0.1 { + t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want) + } + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 4b5a5e07134..f66e58efe4c 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -143,6 +143,7 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er }, } resp, err := ms.c.rawClient.FlushRows(ctx, req) + recordStat(ms.ctx, FlushRequests, 1) if err != nil { return 0, err } @@ -196,9 +197,11 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { r := defaultRetryer{} for { + recordStat(ms.ctx, AppendClientOpenCount, 1) arc, err := ms.open() bo, shouldRetry := r.Retry(err) if err != nil && shouldRetry { + recordStat(ms.ctx, AppendClientOpenRetryCount, 1) if err := gax.Sleep(ms.ctx, bo); err != nil { return nil, nil, err } @@ -257,6 +260,7 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error // we had to amend the initial request err = (*arc).Send(req) } + recordStat(ms.ctx, AppendRequests, 1) if err != nil { bo, shouldRetry := r.Retry(err) if shouldRetry { @@ -349,7 +353,9 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl resp, err := arc.Recv() if err != nil { nextWrite.markDone(NoStreamOffset, err, fc) + continue } + recordStat(ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { fc.release(nextWrite.reqSize)