Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): add base client #4422

Merged
merged 10 commits into from Jul 14, 2021
163 changes: 163 additions & 0 deletions bigquery/storage/managedwriter/client.go
@@ -0,0 +1,163 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"context"
"fmt"
"runtime"
"strings"

storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
)

// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
projectID string
}

// NewClient instantiates a new client.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
numConns := runtime.GOMAXPROCS(0)
if numConns > 4 {
numConns = 4
}
o := []option.ClientOption{
option.WithGRPCConnectionPool(numConns),
}
o = append(o, opts...)

rawClient, err := storage.NewBigQueryWriteClient(ctx, o...)
if err != nil {
return nil, err
}

return &Client{
rawClient: rawClient,
projectID: projectID,
}, nil
}

// NewManagedStream establishes a new stream for writing.
//
// destinationTable should be of the format: projects/{project}/datasets/{dataset}/tables/{table}
shollyman marked this conversation as resolved.
Show resolved Hide resolved
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {

ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
c: c,
}

// apply writer options
for _, opt := range opts {
opt(ms)
}

if err := c.validateOptions(ctx, ms); err != nil {
return nil, err
}

if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: ms.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(ms.streamSettings.streamType),
}}
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("couldn't create write stream: %v", err)
}
streamName = resp.GetName()
}
ms.streamSettings.streamID = streamName
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
}

return ms, nil
}

// validateOptions is used to validate that we received a sane/compatible set of WriterOptions
// for constructing a new managed stream.
func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
if ms == nil {
return fmt.Errorf("no managed stream definition")
}
if ms.streamSettings.streamID != "" {
// User supplied a stream, we need to verify it exists.
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID)
if err != nil {
return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
}
// update type and destination based on stream metadata
ms.streamSettings.streamType = StreamType(info.Type.String())
ms.destinationTable = tableParentFromStreamName(ms.streamSettings.streamID)
}
if ms.destinationTable == "" {
return fmt.Errorf("no destination table specified")
}
// we could auto-select DEFAULT here, but let's force users to be specific for now.
if ms.StreamType() == "" {
return fmt.Errorf("stream type wasn't specified")
}
return nil
}

// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
// as a single transaction. Streams must be finalized before committing.
//
// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type.
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {

// determine table from first streamName, as all must share the same table.
if len(streamNames) <= 0 {
return nil, fmt.Errorf("no streamnames provided")
}

req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: tableParentFromStreamName(streamNames[0]),
WriteStreams: streamNames,
}
return c.rawClient.BatchCommitWriteStreams(ctx, req)
}

// getWriteStream returns information about a given write stream.
//
// It's primarily used for setup validation, and not exposed directly to end users.
func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) {
req := &storagepb.GetWriteStreamRequest{
Name: streamName,
}
return c.rawClient.GetWriteStream(ctx, req)
}

// tableParentFromStreamName return the corresponding parent table
// identifier given a fully qualified streamname.
func tableParentFromStreamName(streamName string) string {
// Stream IDs have the following prefix:
// projects/{project}/datasets/{dataset}/tables/{table}/blah
parts := strings.SplitN(streamName, "/", 7)
if len(parts) < 7 {
// invalid; just pass back the input
return streamName
}
return strings.Join(parts[:6], "/")
}
48 changes: 48 additions & 0 deletions bigquery/storage/managedwriter/client_test.go
@@ -0,0 +1,48 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import "testing"

func TestTableParentFromStreamName(t *testing.T) {
testCases := []struct {
in string
want string
}{
{
"bad",
"bad",
},
{
"projects/foo/datasets/bar/tables/baz",
"projects/foo/datasets/bar/tables/baz",
},
{
"projects/foo/datasets/bar/tables/baz/zip/zam/zoomie",
"projects/foo/datasets/bar/tables/baz",
},
{
"projects/foo/datasets/bar/tables/baz/_default",
"projects/foo/datasets/bar/tables/baz",
},
}

for _, tc := range testCases {
got := tableParentFromStreamName(tc.in)
if got != tc.want {
t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
}
}
}
142 changes: 142 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
@@ -0,0 +1,142 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"context"

storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// StreamType indicates the type of stream this write client is managing.
type StreamType string

var (
// DefaultStream most closely mimics the legacy bigquery
// tabledata.insertAll semantics. Successful inserts are
// committed immediately, and there's no tracking offsets as
// all writes go into a "default" stream that always exists
// for a table.
DefaultStream StreamType = "DEFAULT"

// CommittedStream appends data immediately, but creates a
// discrete stream for the work so that offset tracking can
// be used to track writes.
CommittedStream StreamType = "COMMITTED"

// BufferedStream is a form of checkpointed stream, that allows
// you to advance the offset of visible rows via Flush operations.
BufferedStream StreamType = "BUFFERED"

// PendingStream is a stream in which no data is made visible to
// readers until the stream is finalized and committed explicitly.
PendingStream StreamType = "PENDING"
)

func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
switch t {
case CommittedStream:
return storagepb.WriteStream_COMMITTED
case PendingStream:
return storagepb.WriteStream_PENDING
case BufferedStream:
return storagepb.WriteStream_BUFFERED
default:
return storagepb.WriteStream_TYPE_UNSPECIFIED
}
}

// ManagedStream is the abstraction over a single write stream.
type ManagedStream struct {
streamSettings *streamSettings
destinationTable string
c *Client
}

// streamSettings govern behavior of the append stream RPCs.
type streamSettings struct {

// streamID contains the reference to the destination stream.
streamID string

// streamType governs behavior of the client, such as how
// offset handling is managed.
streamType StreamType

// MaxInflightRequests governs how many unacknowledged
// append writes can be outstanding into the system.
MaxInflightRequests int

// MaxInflightBytes governs how many unacknowledged
// request bytes can be outstanding into the system.
MaxInflightBytes int

// TracePrefix sets a suitable prefix for the trace ID set on
// append requests. Useful for diagnostic purposes.
TracePrefix string
}

func defaultStreamSettings() *streamSettings {
return &streamSettings{
streamType: DefaultStream,
MaxInflightRequests: 1000,
MaxInflightBytes: 0,
TracePrefix: "defaultManagedWriter",
}
}

// StreamName returns the corresponding write stream ID being managed by this writer.
func (ms *ManagedStream) StreamName() string {
return ms.streamSettings.streamID
}

// StreamType returns the configured type for this stream.
func (ms *ManagedStream) StreamType() StreamType {
return ms.streamSettings.streamType
}

// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
// this method for other stream types yields an error.
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
req := &storagepb.FlushRowsRequest{
WriteStream: ms.streamSettings.streamID,
Offset: &wrapperspb.Int64Value{
Value: offset,
},
}
resp, err := ms.c.rawClient.FlushRows(ctx, req)
if err != nil {
return 0, err
}
return resp.GetOffset(), nil
}

// Finalize is used to mark a stream as complete, and thus ensure no further data can
// be appended to the stream. You cannot finalize a DefaultStream, as it always exists.
//
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
// data in a PendingStream.
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
req := &storagepb.FinalizeWriteStreamRequest{
Name: ms.streamSettings.streamID,
}
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
if err != nil {
return 0, err
}
return resp.GetRowCount(), nil
}