diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go new file mode 100644 index 00000000000..4d0bd68bdc0 --- /dev/null +++ b/bigquery/storage/managedwriter/client.go @@ -0,0 +1,161 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "fmt" + "runtime" + "strings" + + storage "cloud.google.com/go/bigquery/storage/apiv1beta2" + "google.golang.org/api/option" + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" +) + +// Client is a managed BigQuery Storage write client scoped to a single project. +type Client struct { + rawClient *storage.BigQueryWriteClient + projectID string +} + +// NewClient instantiates a new client. +func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) { + numConns := runtime.GOMAXPROCS(0) + if numConns > 4 { + numConns = 4 + } + o := []option.ClientOption{ + option.WithGRPCConnectionPool(numConns), + } + o = append(o, opts...) + + rawClient, err := storage.NewBigQueryWriteClient(ctx, o...) + if err != nil { + return nil, err + } + + return &Client{ + rawClient: rawClient, + projectID: projectID, + }, nil +} + +// NewManagedStream establishes a new managed stream for appending data into a table. +func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { + + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + c: c, + } + + // apply writer options + for _, opt := range opts { + opt(ms) + } + + if err := c.validateOptions(ctx, ms); err != nil { + return nil, err + } + + if ms.streamSettings.streamID == "" { + // not instantiated with a stream, construct one. + streamName := fmt.Sprintf("%s/_default", ms.destinationTable) + if ms.streamSettings.streamType != DefaultStream { + // For everything but a default stream, we create a new stream on behalf of the user. + req := &storagepb.CreateWriteStreamRequest{ + Parent: ms.destinationTable, + WriteStream: &storagepb.WriteStream{ + Type: streamTypeToEnum(ms.streamSettings.streamType), + }} + resp, err := ms.c.rawClient.CreateWriteStream(ctx, req) + if err != nil { + return nil, fmt.Errorf("couldn't create write stream: %v", err) + } + streamName = resp.GetName() + } + ms.streamSettings.streamID = streamName + // TODO(followup CLs): instantiate an appendstream client, flow controller, etc. + } + + return ms, nil +} + +// validateOptions is used to validate that we received a sane/compatible set of WriterOptions +// for constructing a new managed stream. +func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error { + if ms == nil { + return fmt.Errorf("no managed stream definition") + } + if ms.streamSettings.streamID != "" { + // User supplied a stream, we need to verify it exists. + info, err := c.getWriteStream(ctx, ms.streamSettings.streamID) + if err != nil { + return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err) + } + // update type and destination based on stream metadata + ms.streamSettings.streamType = StreamType(info.Type.String()) + ms.destinationTable = tableParentFromStreamName(ms.streamSettings.streamID) + } + if ms.destinationTable == "" { + return fmt.Errorf("no destination table specified") + } + // we could auto-select DEFAULT here, but let's force users to be specific for now. + if ms.StreamType() == "" { + return fmt.Errorf("stream type wasn't specified") + } + return nil +} + +// BatchCommit is used to commit one or more PendingStream streams belonging to the same table +// as a single transaction. Streams must be finalized before committing. +// +// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type. +func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) { + + // determine table from first streamName, as all must share the same table. + if len(streamNames) <= 0 { + return nil, fmt.Errorf("no streamnames provided") + } + + req := &storagepb.BatchCommitWriteStreamsRequest{ + Parent: tableParentFromStreamName(streamNames[0]), + WriteStreams: streamNames, + } + return c.rawClient.BatchCommitWriteStreams(ctx, req) +} + +// getWriteStream returns information about a given write stream. +// +// It's primarily used for setup validation, and not exposed directly to end users. +func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) { + req := &storagepb.GetWriteStreamRequest{ + Name: streamName, + } + return c.rawClient.GetWriteStream(ctx, req) +} + +// tableParentFromStreamName return the corresponding parent table +// identifier given a fully qualified streamname. +func tableParentFromStreamName(streamName string) string { + // Stream IDs have the following prefix: + // projects/{project}/datasets/{dataset}/tables/{table}/blah + parts := strings.SplitN(streamName, "/", 7) + if len(parts) < 7 { + // invalid; just pass back the input + return streamName + } + return strings.Join(parts[:6], "/") +} diff --git a/bigquery/storage/managedwriter/client_test.go b/bigquery/storage/managedwriter/client_test.go new file mode 100644 index 00000000000..81d91e5ca13 --- /dev/null +++ b/bigquery/storage/managedwriter/client_test.go @@ -0,0 +1,48 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import "testing" + +func TestTableParentFromStreamName(t *testing.T) { + testCases := []struct { + in string + want string + }{ + { + "bad", + "bad", + }, + { + "projects/foo/datasets/bar/tables/baz", + "projects/foo/datasets/bar/tables/baz", + }, + { + "projects/foo/datasets/bar/tables/baz/zip/zam/zoomie", + "projects/foo/datasets/bar/tables/baz", + }, + { + "projects/foo/datasets/bar/tables/baz/_default", + "projects/foo/datasets/bar/tables/baz", + }, + } + + for _, tc := range testCases { + got := tableParentFromStreamName(tc.in) + if got != tc.want { + t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want) + } + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go new file mode 100644 index 00000000000..93854911300 --- /dev/null +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -0,0 +1,142 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +// StreamType indicates the type of stream this write client is managing. +type StreamType string + +var ( + // DefaultStream most closely mimics the legacy bigquery + // tabledata.insertAll semantics. Successful inserts are + // committed immediately, and there's no tracking offsets as + // all writes go into a "default" stream that always exists + // for a table. + DefaultStream StreamType = "DEFAULT" + + // CommittedStream appends data immediately, but creates a + // discrete stream for the work so that offset tracking can + // be used to track writes. + CommittedStream StreamType = "COMMITTED" + + // BufferedStream is a form of checkpointed stream, that allows + // you to advance the offset of visible rows via Flush operations. + BufferedStream StreamType = "BUFFERED" + + // PendingStream is a stream in which no data is made visible to + // readers until the stream is finalized and committed explicitly. + PendingStream StreamType = "PENDING" +) + +func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { + switch t { + case CommittedStream: + return storagepb.WriteStream_COMMITTED + case PendingStream: + return storagepb.WriteStream_PENDING + case BufferedStream: + return storagepb.WriteStream_BUFFERED + default: + return storagepb.WriteStream_TYPE_UNSPECIFIED + } +} + +// ManagedStream is the abstraction over a single write stream. +type ManagedStream struct { + streamSettings *streamSettings + destinationTable string + c *Client +} + +// streamSettings govern behavior of the append stream RPCs. +type streamSettings struct { + + // streamID contains the reference to the destination stream. + streamID string + + // streamType governs behavior of the client, such as how + // offset handling is managed. + streamType StreamType + + // MaxInflightRequests governs how many unacknowledged + // append writes can be outstanding into the system. + MaxInflightRequests int + + // MaxInflightBytes governs how many unacknowledged + // request bytes can be outstanding into the system. + MaxInflightBytes int + + // TracePrefix sets a suitable prefix for the trace ID set on + // append requests. Useful for diagnostic purposes. + TracePrefix string +} + +func defaultStreamSettings() *streamSettings { + return &streamSettings{ + streamType: DefaultStream, + MaxInflightRequests: 1000, + MaxInflightBytes: 0, + TracePrefix: "defaultManagedWriter", + } +} + +// StreamName returns the corresponding write stream ID being managed by this writer. +func (ms *ManagedStream) StreamName() string { + return ms.streamSettings.streamID +} + +// StreamType returns the configured type for this stream. +func (ms *ManagedStream) StreamType() StreamType { + return ms.streamSettings.streamType +} + +// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling +// this method for other stream types yields an error. +func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) { + req := &storagepb.FlushRowsRequest{ + WriteStream: ms.streamSettings.streamID, + Offset: &wrapperspb.Int64Value{ + Value: offset, + }, + } + resp, err := ms.c.rawClient.FlushRows(ctx, req) + if err != nil { + return 0, err + } + return resp.GetOffset(), nil +} + +// Finalize is used to mark a stream as complete, and thus ensure no further data can +// be appended to the stream. You cannot finalize a DefaultStream, as it always exists. +// +// Finalizing does not advance the current offset of a BufferedStream, nor does it commit +// data in a PendingStream. +func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { + // TODO: consider blocking for in-flight appends once we have an appendStream plumbed in. + req := &storagepb.FinalizeWriteStreamRequest{ + Name: ms.streamSettings.streamID, + } + resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req) + if err != nil { + return 0, err + } + return resp.GetRowCount(), nil +} diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go new file mode 100644 index 00000000000..e0032788436 --- /dev/null +++ b/bigquery/storage/managedwriter/writer_option.go @@ -0,0 +1,69 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +// WriterOption is used to configure a ManagedWriteClient. +type WriterOption func(*ManagedStream) + +// WithType sets the write type of the new writer. +func WithType(st StreamType) WriterOption { + return func(ms *ManagedStream) { + ms.streamSettings.streamType = st + } +} + +// WithStreamName allows users to set the stream name this writer will +// append to explicitly. By default, the managed client will create the +// stream when instantiated. +// +// Note: Supplying this option causes other options such as WithStreamType +// and WithDestinationTable to be ignored. +func WithStreamName(name string) WriterOption { + return func(ms *ManagedStream) { + ms.streamSettings.streamID = name + } +} + +// WithDestinationTable specifies the destination table to which a created +// stream will append rows. Format of the table: +// +// projects/{projectid}/datasets/{dataset}/tables/{table} +func WithDestinationTable(destTable string) WriterOption { + return func(ms *ManagedStream) { + ms.destinationTable = destTable + } +} + +// WithMaxInflightRequests bounds the inflight appends on the write connection. +func WithMaxInflightRequests(n int) WriterOption { + return func(ms *ManagedStream) { + ms.streamSettings.MaxInflightRequests = n + } +} + +// WithMaxInflightBytes bounds the inflight append request bytes on the write connection. +func WithMaxInflightBytes(n int) WriterOption { + return func(ms *ManagedStream) { + ms.streamSettings.MaxInflightBytes = n + } +} + +// WithTracePrefix allows instruments requests to the service with a custom trace prefix. +// This is generally for diagnostic purposes only. +func WithTracePrefix(prefix string) WriterOption { + return func(ms *ManagedStream) { + ms.streamSettings.TracePrefix = prefix + } +} diff --git a/bigquery/storage/managedwriter/writer_option_test.go b/bigquery/storage/managedwriter/writer_option_test.go new file mode 100644 index 00000000000..c1e754fed1a --- /dev/null +++ b/bigquery/storage/managedwriter/writer_option_test.go @@ -0,0 +1,117 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestWriterOptions(t *testing.T) { + + testCases := []struct { + desc string + options []WriterOption + want *ManagedStream + }{ + { + desc: "WithType", + options: []WriterOption{WithType(BufferedStream)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.streamType = BufferedStream + return ms + }(), + }, + { + desc: "WithMaxInflightRequests", + options: []WriterOption{WithMaxInflightRequests(2)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.MaxInflightRequests = 2 + return ms + }(), + }, + { + desc: "WithMaxInflightBytes", + options: []WriterOption{WithMaxInflightBytes(5)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.MaxInflightBytes = 5 + return ms + }(), + }, + { + desc: "WithTracePrefix", + options: []WriterOption{WithTracePrefix("foo")}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.TracePrefix = "foo" + return ms + }(), + }, + { + desc: "WithDestinationTable", + options: []WriterOption{WithDestinationTable("foo")}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + destinationTable: "foo", + } + return ms + }(), + }, + { + desc: "multiple", + options: []WriterOption{ + WithType(PendingStream), + WithMaxInflightBytes(5), + WithTracePrefix("pre"), + }, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.MaxInflightBytes = 5 + ms.streamSettings.streamType = PendingStream + ms.streamSettings.TracePrefix = "pre" + return ms + }(), + }, + } + + for _, tc := range testCases { + got := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + for _, o := range tc.options { + o(got) + } + + if diff := cmp.Diff(got, tc.want, + cmp.AllowUnexported(ManagedStream{}, streamSettings{})); diff != "" { + t.Errorf("diff in case (%s):\n%v", tc.desc, diff) + } + } +}