From b0853846a34a32ca45deb92a3cc6ab843473acd8 Mon Sep 17 00:00:00 2001 From: shollyman Date: Mon, 26 Jul 2021 12:30:39 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): add append stream plumbing (#4452) This PR adds enough of the wiring to the client to being testing via integration tests. It adapts a similar pattern to the pullstream in pubsub, in that it abstracts individual calls from stream state management. There's two significant units of future work that may yield changes here: * For traffic efficiency sake, we only want to add things like the stream ID, schema, and trace ID to the first append on any stream. * For stream connection retry, we may want to re-send writes that were sent but we didn't get an acknowledgement back. For default/committed streams, this behavior may yield additional writes (at least once semantics). For buffered/pending streams, it means either the library or user should know to expect "data already present" for these resent-writes. Towards https://github.com/googleapis/google-cloud-go/issues/4366 --- bigquery/storage/managedwriter/client.go | 71 ++++-- bigquery/storage/managedwriter/doc.go | 3 +- .../storage/managedwriter/integration_test.go | 207 ++++++++++++++++++ .../storage/managedwriter/managed_stream.go | 206 +++++++++++++++++ .../managedwriter/managed_stream_test.go | 183 ++++++++++++++++ bigquery/storage/managedwriter/retry.go | 43 ++++ .../managedwriter/testdata/messages.pb.go | 177 +++++++++++++++ .../managedwriter/testdata/messages.proto | 28 +++ .../storage/managedwriter/writer_option.go | 9 + .../managedwriter/writer_option_test.go | 4 +- 10 files changed, 910 insertions(+), 21 deletions(-) create mode 100644 bigquery/storage/managedwriter/integration_test.go create mode 100644 bigquery/storage/managedwriter/managed_stream_test.go create mode 100644 bigquery/storage/managedwriter/retry.go create mode 100644 bigquery/storage/managedwriter/testdata/messages.pb.go create mode 100644 bigquery/storage/managedwriter/testdata/messages.proto diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 4d0bd68bdc0..d8f60ed933b 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -21,8 +21,10 @@ import ( "strings" storage "cloud.google.com/go/bigquery/storage/apiv1beta2" + "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/grpc" ) // Client is a managed BigQuery Storage write client scoped to a single project. @@ -53,12 +55,40 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio }, nil } +// Close releases resources held by the client. +func (c *Client) Close() error { + // TODO: consider if we should propagate a cancellation from client to all associated managed streams. + if c.rawClient == nil { + return fmt.Errorf("already closed") + } + c.rawClient.Close() + c.rawClient = nil + return nil +} + // NewManagedStream establishes a new managed stream for appending data into a table. +// +// Context here is retained for use by the underlying streaming connections the managed stream may create. func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { + return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...) +} + +func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) { + + ctx, cancel := context.WithCancel(ctx) ms := &ManagedStream{ streamSettings: defaultStreamSettings(), c: c, + ctx: ctx, + cancel: cancel, + open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { + arc, err := streamFunc(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024))) + if err != nil { + return nil, err + } + return arc, nil + }, } // apply writer options @@ -66,28 +96,31 @@ func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*M opt(ms) } - if err := c.validateOptions(ctx, ms); err != nil { - return nil, err - } + // skipSetup exists for testing scenarios. + if !skipSetup { + if err := c.validateOptions(ctx, ms); err != nil { + return nil, err + } - if ms.streamSettings.streamID == "" { - // not instantiated with a stream, construct one. - streamName := fmt.Sprintf("%s/_default", ms.destinationTable) - if ms.streamSettings.streamType != DefaultStream { - // For everything but a default stream, we create a new stream on behalf of the user. - req := &storagepb.CreateWriteStreamRequest{ - Parent: ms.destinationTable, - WriteStream: &storagepb.WriteStream{ - Type: streamTypeToEnum(ms.streamSettings.streamType), - }} - resp, err := ms.c.rawClient.CreateWriteStream(ctx, req) - if err != nil { - return nil, fmt.Errorf("couldn't create write stream: %v", err) + if ms.streamSettings.streamID == "" { + // not instantiated with a stream, construct one. + streamName := fmt.Sprintf("%s/_default", ms.destinationTable) + if ms.streamSettings.streamType != DefaultStream { + // For everything but a default stream, we create a new stream on behalf of the user. + req := &storagepb.CreateWriteStreamRequest{ + Parent: ms.destinationTable, + WriteStream: &storagepb.WriteStream{ + Type: streamTypeToEnum(ms.streamSettings.streamType), + }} + resp, err := ms.c.rawClient.CreateWriteStream(ctx, req) + if err != nil { + return nil, fmt.Errorf("couldn't create write stream: %v", err) + } + streamName = resp.GetName() } - streamName = resp.GetName() + ms.streamSettings.streamID = streamName + // TODO(followup CLs): instantiate an appendstream client, flow controller, etc. } - ms.streamSettings.streamID = streamName - // TODO(followup CLs): instantiate an appendstream client, flow controller, etc. } return ms, nil diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go index a8e580bd90c..5fb6dfd6f54 100644 --- a/bigquery/storage/managedwriter/doc.go +++ b/bigquery/storage/managedwriter/doc.go @@ -14,7 +14,8 @@ // Package managedwriter will be a thick client around the storage API's BigQueryWriteClient. // -// It is EXPERIMENTAL and subject to change or removal without notice. +// It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha +// state, and breaking changes are frequent. // // Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is // a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go new file mode 100644 index 00000000000..4b276b2e7aa --- /dev/null +++ b/bigquery/storage/managedwriter/integration_test.go @@ -0,0 +1,207 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "fmt" + "testing" + "time" + + "cloud.google.com/go/bigquery" + "cloud.google.com/go/bigquery/storage/managedwriter/adapt" + "cloud.google.com/go/bigquery/storage/managedwriter/testdata" + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/internal/uid" + "google.golang.org/api/option" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" +) + +var ( + datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) + tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) + defaultTestTimeout = 15 * time.Second +) + +func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { + if testing.Short() { + t.Skip("Integration tests skipped in short mode") + } + projID := testutil.ProjID() + if projID == "" { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery") + if ts == nil { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + opts = append(opts, option.WithTokenSource(ts)) + client, err := NewClient(ctx, projID, opts...) + if err != nil { + t.Fatalf("couldn't create managedwriter client: %v", err) + } + + bqClient, err := bigquery.NewClient(ctx, projID, opts...) + if err != nil { + t.Fatalf("couldn't create bigquery client: %v", err) + } + return client, bqClient +} + +// validateRowCount confirms the number of rows in a table visible to the query engine. +func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) { + + // Verify data is present in the table with a count query. + sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID) + q := client.Query(sql) + it, err := q.Read(ctx) + if err != nil { + t.Errorf("failed to issue validation query: %v", err) + return + } + var rowdata []bigquery.Value + err = it.Next(&rowdata) + if err != nil { + t.Errorf("error fetching validation results: %v", err) + return + } + count, ok := rowdata[0].(int64) + if !ok { + t.Errorf("got unexpected data from validation query: %v", rowdata[0]) + } + if count != expectedRows { + t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows) + } +} + +// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred. +func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) { + dataset := bqc.Dataset(datasetIDs.New()) + if err := dataset.Create(ctx, nil); err != nil { + return nil, nil, err + } + return dataset, func() { + if err := dataset.DeleteWithContents(ctx); err != nil { + t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err) + } + }, nil +} + +// setupDynamicDescriptors aids testing when not using a supplied proto +func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) { + convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema) + if err != nil { + t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err) + } + + descriptor, err := adapt.StorageSchemaToDescriptor(convertedSchema, "root") + if err != nil { + t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err) + } + messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) + if !ok { + t.Fatalf("adapted descriptor is not a message descriptor") + } + return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) +} + +func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the test rows individually. + var results []*AppendResult + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err = ms.AppendRows(data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + // wait for the result to indicate ready, then validate. + results[0].Ready() + wantRows := int64(len(testData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows) + + // Now, send the test rows grouped into in a single append. + var data [][]byte + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := append(data, b) + results, err = ms.AppendRows(data, NoStreamOffset) + if err != nil { + t.Errorf("grouped-row append failed: %v", err) + } + } + // wait for the result to indicate ready, then validate again. + results[0].Ready() + wantRows = wantRows * 2 + validateRowCount(ctx, t, bqClient, testTable, wantRows) +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 93854911300..296e8b491fd 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -16,8 +16,14 @@ package managedwriter import ( "context" + "fmt" + "io" + "sync" + "github.com/googleapis/gax-go/v2" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + grpcstatus "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -62,10 +68,25 @@ func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { // ManagedStream is the abstraction over a single write stream. type ManagedStream struct { streamSettings *streamSettings + schemaDescriptor *descriptorpb.DescriptorProto destinationTable string c *Client + + // aspects of the stream client + ctx context.Context // retained context for the stream + cancel context.CancelFunc + open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + + mu sync.Mutex + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + err error // terminal error + pending chan *pendingWrite // writes awaiting status + streamSetup *sync.Once // handles amending the first request in a new stream } +// enables testing +type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) + // streamSettings govern behavior of the append stream RPCs. type streamSettings struct { @@ -140,3 +161,188 @@ func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { } return resp.GetRowCount(), nil } + +// getStream returns either a valid ARC client stream or permanent error. +// +// Calling getStream locks the mutex. +func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + if ms.err != nil { + return nil, nil, ms.err + } + ms.err = ms.ctx.Err() + if ms.err != nil { + return nil, nil, ms.err + } + + // Always return the retained ARC if the arg differs. + if arc != ms.arc { + return ms.arc, ms.pending, nil + } + + ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) + *ms.arc, ms.pending, ms.err = ms.openWithRetry() + return ms.arc, ms.pending, ms.err +} + +// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. +// +// Only getStream() should call this, and thus the calling code has the mutex lock. +func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { + r := defaultRetryer{} + for { + arc, err := ms.open() + bo, shouldRetry := r.Retry(err) + if err != nil && shouldRetry { + if err := gax.Sleep(ms.ctx, bo); err != nil { + return nil, nil, err + } + continue + } + if err == nil { + // The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan + // and fire up the associated receive processor. + ch := make(chan *pendingWrite) + go recvProcessor(ms.ctx, arc, ch) + // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work + // for every new connection. + ms.streamSetup = new(sync.Once) + return arc, ch, nil + } + return arc, nil, err + } +} + +func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error { + var settings gax.CallSettings + for _, opt := range opts { + opt.Resolve(&settings) + } + var r gax.Retryer = &defaultRetryer{} + if settings.Retry != nil { + r = settings.Retry() + } + + var arc *storagepb.BigQueryWrite_AppendRowsClient + var ch chan *pendingWrite + var err error + + for { + arc, ch, err = ms.getStream(arc) + if err != nil { + return err + } + var req *storagepb.AppendRowsRequest + ms.streamSetup.Do(func() { + reqCopy := *pw.request + reqCopy.WriteStream = ms.streamSettings.streamID + reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ + ProtoDescriptor: ms.schemaDescriptor, + } + reqCopy.TraceId = ms.streamSettings.TracePrefix + req = &reqCopy + }) + + var err error + if req == nil { + err = (*arc).Send(pw.request) + } else { + // we had to amend the initial request + err = (*arc).Send(req) + } + if err != nil { + bo, shouldRetry := r.Retry(err) + if shouldRetry { + if err := gax.Sleep(ms.ctx, bo); err != nil { + return err + } + continue + } + ms.mu.Lock() + ms.err = err + ms.mu.Unlock() + } + if err == nil { + ch <- pw + } + return err + } +} + +// Close closes a managed stream. +func (ms *ManagedStream) Close() error { + + var arc *storagepb.BigQueryWrite_AppendRowsClient + + arc, ch, err := ms.getStream(arc) + if err != nil { + return err + } + if ms.arc == nil { + return fmt.Errorf("no stream exists") + } + err = (*arc).CloseSend() + if err == nil { + close(ch) + } + ms.mu.Lock() + ms.err = io.EOF + ms.mu.Unlock() + return err +} + +// AppendRows sends the append requests to the service, and returns one AppendResult per row. +func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResult, error) { + pw := newPendingWrite(data, offset) + if err := ms.append(pw); err != nil { + // pending write is DOA, mark it done. + pw.markDone(NoStreamOffset, err) + return nil, err + } + return pw.results, nil +} + +// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. +// +// The receive processor only deals with a single instance of a connection/channel, and thus should never interact +// with the mutex lock. +func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { + // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply + // ensure that pending writes get acknowledged with a terminal state. + for { + select { + case <-ctx.Done(): + // Context is done, so we're not going to get further updates. Mark all work failed with the context error. + for { + pw, ok := <-ch + if !ok { + return + } + pw.markDone(NoStreamOffset, ctx.Err()) + } + case nextWrite, ok := <-ch: + if !ok { + // Channel closed, all elements processed. + return + } + + // block until we get a corresponding response or err from stream. + resp, err := arc.Recv() + if err != nil { + nextWrite.markDone(NoStreamOffset, err) + } + + if status := resp.GetError(); status != nil { + nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status)) + continue + } + success := resp.GetAppendResult() + off := success.GetOffset() + if off != nil { + nextWrite.markDone(off.GetValue(), nil) + } + nextWrite.markDone(NoStreamOffset, nil) + } + } +} diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go new file mode 100644 index 00000000000..ba9f12eb5aa --- /dev/null +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -0,0 +1,183 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "testing" + + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" +) + +func TestManagedStream_OpenWithRetry(t *testing.T) { + + testCases := []struct { + desc string + errors []error + wantFail bool + }{ + { + desc: "no error", + errors: []error{nil}, + wantFail: false, + }, + { + desc: "transient failures", + errors: []error{ + status.Errorf(codes.Unavailable, "try 1"), + status.Errorf(codes.Unavailable, "try 2"), + nil}, + wantFail: false, + }, + { + desc: "terminal error", + errors: []error{status.Errorf(codes.InvalidArgument, "bad args")}, + wantFail: true, + }, + } + + for _, tc := range testCases { + ms := &ManagedStream{ + ctx: context.Background(), + open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { + if len(tc.errors) == 0 { + panic("out of errors") + } + err := tc.errors[0] + tc.errors = tc.errors[1:] + if err == nil { + return &testAppendRowsClient{}, nil + } + return nil, err + }, + } + arc, ch, err := ms.openWithRetry() + if tc.wantFail && err == nil { + t.Errorf("case %s: wanted failure, got success", tc.desc) + } + if !tc.wantFail && err != nil { + t.Errorf("case %s: wanted success, got %v", tc.desc, err) + } + if err == nil { + if arc == nil { + t.Errorf("case %s: expected append client, got nil", tc.desc) + } + if ch == nil { + t.Errorf("case %s: expected channel, got nil", tc.desc) + } + } + } +} + +func TestManagedStream_FirstAppendBehavior(t *testing.T) { + + var testARC *testAppendRowsClient + testARC = &testAppendRowsClient{ + recvF: func() (*storagepb.AppendRowsResponse, error) { + return &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_AppendResult_{}, + }, nil + }, + sendF: func(req *storagepb.AppendRowsRequest) error { + testARC.requests = append(testARC.requests, req) + return nil + }, + } + schema := &descriptorpb.DescriptorProto{ + Name: proto.String("testDescriptor"), + } + + ms := &ManagedStream{ + ctx: context.Background(), + open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { + testARC.openCount = testARC.openCount + 1 + return testARC, nil + }, + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.streamID = "FOO" + ms.streamSettings.TracePrefix = "TRACE" + ms.schemaDescriptor = schema + + fakeData := [][]byte{ + []byte("foo"), + []byte("bar"), + } + + wantReqs := 3 + + for i := 0; i < wantReqs; i++ { + _, err := ms.AppendRows(fakeData, NoStreamOffset) + if err != nil { + t.Errorf("AppendRows; %v", err) + } + } + + if testARC.openCount != 1 { + t.Errorf("expected a single open, got %d", testARC.openCount) + } + + if len(testARC.requests) != wantReqs { + t.Errorf("expected %d requests, got %d", wantReqs, len(testARC.requests)) + } + + for k, v := range testARC.requests { + if v == nil { + t.Errorf("request %d was nil", k) + } + if k == 0 { + if v.GetTraceId() == "" { + t.Errorf("expected TraceId on first request, was empty") + } + if v.GetWriteStream() == "" { + t.Errorf("expected WriteStream on first request, was empty") + } + if v.GetProtoRows().GetWriterSchema().GetProtoDescriptor() == nil { + t.Errorf("expected WriterSchema on first request, was empty") + } + + } else { + if v.GetTraceId() != "" { + t.Errorf("expected no TraceID on request %d, got %s", k, v.GetTraceId()) + } + if v.GetWriteStream() != "" { + t.Errorf("expected no WriteStream on request %d, got %s", k, v.GetWriteStream()) + } + if v.GetProtoRows().GetWriterSchema().GetProtoDescriptor() != nil { + t.Errorf("expected test WriterSchema on request %d, got %s", k, v.GetProtoRows().GetWriterSchema().GetProtoDescriptor().String()) + } + } + } +} + +type testAppendRowsClient struct { + storagepb.BigQueryWrite_AppendRowsClient + openCount int + requests []*storagepb.AppendRowsRequest + sendF func(*storagepb.AppendRowsRequest) error + recvF func() (*storagepb.AppendRowsResponse, error) +} + +func (tarc *testAppendRowsClient) Send(req *storagepb.AppendRowsRequest) error { + return tarc.sendF(req) +} + +func (tarc *testAppendRowsClient) Recv() (*storagepb.AppendRowsResponse, error) { + return tarc.recvF() +} diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go new file mode 100644 index 00000000000..1f272a99932 --- /dev/null +++ b/bigquery/storage/managedwriter/retry.go @@ -0,0 +1,43 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "time" + + "github.com/googleapis/gax-go/v2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type defaultRetryer struct { + bo gax.Backoff +} + +func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { + // TODO: refine this logic in a subsequent PR, there's some service-specific + // retry predicates in addition to statuscode-based. + s, ok := status.FromError(err) + if !ok { + // non-status based errors as retryable + return r.bo.Pause(), true + } + switch s.Code() { + case codes.Unavailable: + return r.bo.Pause(), true + default: + return r.bo.Pause(), false + } +} diff --git a/bigquery/storage/managedwriter/testdata/messages.pb.go b/bigquery/storage/managedwriter/testdata/messages.pb.go new file mode 100644 index 00000000000..162b0d9c537 --- /dev/null +++ b/bigquery/storage/managedwriter/testdata/messages.pb.go @@ -0,0 +1,177 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.10.1 +// source: messages.proto + +package testdata + +import ( + reflect "reflect" + sync "sync" + + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +// SimpleMessage represents a simple message that transmits a string and int64 value. +type SimpleMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // name is a simple scalar string. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // value is a simple int64 value. + Value int64 `protobuf:"varint,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *SimpleMessage) Reset() { + *x = SimpleMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SimpleMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SimpleMessage) ProtoMessage() {} + +func (x *SimpleMessage) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SimpleMessage.ProtoReflect.Descriptor instead. +func (*SimpleMessage) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{0} +} + +func (x *SimpleMessage) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *SimpleMessage) GetValue() int64 { + if x != nil { + return x.Value + } + return 0 +} + +var File_messages_proto protoreflect.FileDescriptor + +var file_messages_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x22, 0x39, 0x0a, 0x0d, 0x53, 0x69, + 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, 0x61, + 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_messages_proto_rawDescOnce sync.Once + file_messages_proto_rawDescData = file_messages_proto_rawDesc +) + +func file_messages_proto_rawDescGZIP() []byte { + file_messages_proto_rawDescOnce.Do(func() { + file_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_messages_proto_rawDescData) + }) + return file_messages_proto_rawDescData +} + +var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_messages_proto_goTypes = []interface{}{ + (*SimpleMessage)(nil), // 0: testdata.SimpleMessage +} +var file_messages_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_messages_proto_init() } +func file_messages_proto_init() { + if File_messages_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SimpleMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_messages_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_messages_proto_goTypes, + DependencyIndexes: file_messages_proto_depIdxs, + MessageInfos: file_messages_proto_msgTypes, + }.Build() + File_messages_proto = out.File + file_messages_proto_rawDesc = nil + file_messages_proto_goTypes = nil + file_messages_proto_depIdxs = nil +} diff --git a/bigquery/storage/managedwriter/testdata/messages.proto b/bigquery/storage/managedwriter/testdata/messages.proto new file mode 100644 index 00000000000..9b77e539ef5 --- /dev/null +++ b/bigquery/storage/managedwriter/testdata/messages.proto @@ -0,0 +1,28 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package testdata; +option go_package = "cloud.google.com/go/bigquery/storage/managedwriter/testdata"; + + +// SimpleMessage represents a simple message that transmits a string and int64 value. +message SimpleMessage { + // name is a simple scalar string. + string name = 1; + // value is a simple int64 value. + int64 value = 2; +} + + diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go index e0032788436..180418c5ff1 100644 --- a/bigquery/storage/managedwriter/writer_option.go +++ b/bigquery/storage/managedwriter/writer_option.go @@ -14,6 +14,8 @@ package managedwriter +import "google.golang.org/protobuf/types/descriptorpb" + // WriterOption is used to configure a ManagedWriteClient. type WriterOption func(*ManagedStream) @@ -67,3 +69,10 @@ func WithTracePrefix(prefix string) WriterOption { ms.streamSettings.TracePrefix = prefix } } + +// WithSchemaDescriptor describes the format of messages you'll be sending to the service. +func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { + return func(ms *ManagedStream) { + ms.schemaDescriptor = dp + } +} diff --git a/bigquery/storage/managedwriter/writer_option_test.go b/bigquery/storage/managedwriter/writer_option_test.go index c1e754fed1a..929f8b2292e 100644 --- a/bigquery/storage/managedwriter/writer_option_test.go +++ b/bigquery/storage/managedwriter/writer_option_test.go @@ -15,6 +15,7 @@ package managedwriter import ( + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -110,7 +111,8 @@ func TestWriterOptions(t *testing.T) { } if diff := cmp.Diff(got, tc.want, - cmp.AllowUnexported(ManagedStream{}, streamSettings{})); diff != "" { + cmp.AllowUnexported(ManagedStream{}, streamSettings{}), + cmp.AllowUnexported(sync.Mutex{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } }