Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): add base client (#4422)
Browse files Browse the repository at this point in the history
* feat(bigquery/storage/managedwriter): add base client

This PR adds a base client and implements some of the surface.  All the
streaming client abstractions are elided and will be introduced in
subsequent PRs, but this PR does include non-streaming RPC methods

Alongside the client, we introduce an option type (WriterOption) for
constructing a client in a variadic fashion.

The client contains an internal settings type, streamSettings, which
contains fields of note for both the streaming client abstraction and
its flow controller.

Testing: This PR contains unit tests, but doesn't include integration
tests.  I'll start hoisting that in soon.

* clarify docstring

* address comment lint errors

* refactor into an explicit Client and ManagedStream type

* adjust NewManagedStream signature, make dest table optional

* update comment
  • Loading branch information
shollyman committed Jul 14, 2021
1 parent 10fd816 commit 4f7193b
Show file tree
Hide file tree
Showing 5 changed files with 537 additions and 0 deletions.
161 changes: 161 additions & 0 deletions bigquery/storage/managedwriter/client.go
@@ -0,0 +1,161 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"context"
"fmt"
"runtime"
"strings"

storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
)

// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
projectID string
}

// NewClient instantiates a new client.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
numConns := runtime.GOMAXPROCS(0)
if numConns > 4 {
numConns = 4
}
o := []option.ClientOption{
option.WithGRPCConnectionPool(numConns),
}
o = append(o, opts...)

rawClient, err := storage.NewBigQueryWriteClient(ctx, o...)
if err != nil {
return nil, err
}

return &Client{
rawClient: rawClient,
projectID: projectID,
}, nil
}

// NewManagedStream establishes a new managed stream for appending data into a table.
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {

ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
c: c,
}

// apply writer options
for _, opt := range opts {
opt(ms)
}

if err := c.validateOptions(ctx, ms); err != nil {
return nil, err
}

if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: ms.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(ms.streamSettings.streamType),
}}
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("couldn't create write stream: %v", err)
}
streamName = resp.GetName()
}
ms.streamSettings.streamID = streamName
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
}

return ms, nil
}

// validateOptions is used to validate that we received a sane/compatible set of WriterOptions
// for constructing a new managed stream.
func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
if ms == nil {
return fmt.Errorf("no managed stream definition")
}
if ms.streamSettings.streamID != "" {
// User supplied a stream, we need to verify it exists.
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID)
if err != nil {
return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
}
// update type and destination based on stream metadata
ms.streamSettings.streamType = StreamType(info.Type.String())
ms.destinationTable = tableParentFromStreamName(ms.streamSettings.streamID)
}
if ms.destinationTable == "" {
return fmt.Errorf("no destination table specified")
}
// we could auto-select DEFAULT here, but let's force users to be specific for now.
if ms.StreamType() == "" {
return fmt.Errorf("stream type wasn't specified")
}
return nil
}

// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
// as a single transaction. Streams must be finalized before committing.
//
// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type.
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {

// determine table from first streamName, as all must share the same table.
if len(streamNames) <= 0 {
return nil, fmt.Errorf("no streamnames provided")
}

req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: tableParentFromStreamName(streamNames[0]),
WriteStreams: streamNames,
}
return c.rawClient.BatchCommitWriteStreams(ctx, req)
}

// getWriteStream returns information about a given write stream.
//
// It's primarily used for setup validation, and not exposed directly to end users.
func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) {
req := &storagepb.GetWriteStreamRequest{
Name: streamName,
}
return c.rawClient.GetWriteStream(ctx, req)
}

// tableParentFromStreamName return the corresponding parent table
// identifier given a fully qualified streamname.
func tableParentFromStreamName(streamName string) string {
// Stream IDs have the following prefix:
// projects/{project}/datasets/{dataset}/tables/{table}/blah
parts := strings.SplitN(streamName, "/", 7)
if len(parts) < 7 {
// invalid; just pass back the input
return streamName
}
return strings.Join(parts[:6], "/")
}
48 changes: 48 additions & 0 deletions bigquery/storage/managedwriter/client_test.go
@@ -0,0 +1,48 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import "testing"

func TestTableParentFromStreamName(t *testing.T) {
testCases := []struct {
in string
want string
}{
{
"bad",
"bad",
},
{
"projects/foo/datasets/bar/tables/baz",
"projects/foo/datasets/bar/tables/baz",
},
{
"projects/foo/datasets/bar/tables/baz/zip/zam/zoomie",
"projects/foo/datasets/bar/tables/baz",
},
{
"projects/foo/datasets/bar/tables/baz/_default",
"projects/foo/datasets/bar/tables/baz",
},
}

for _, tc := range testCases {
got := tableParentFromStreamName(tc.in)
if got != tc.want {
t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
}
}
}
142 changes: 142 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
@@ -0,0 +1,142 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"context"

storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// StreamType indicates the type of stream this write client is managing.
type StreamType string

var (
// DefaultStream most closely mimics the legacy bigquery
// tabledata.insertAll semantics. Successful inserts are
// committed immediately, and there's no tracking offsets as
// all writes go into a "default" stream that always exists
// for a table.
DefaultStream StreamType = "DEFAULT"

// CommittedStream appends data immediately, but creates a
// discrete stream for the work so that offset tracking can
// be used to track writes.
CommittedStream StreamType = "COMMITTED"

// BufferedStream is a form of checkpointed stream, that allows
// you to advance the offset of visible rows via Flush operations.
BufferedStream StreamType = "BUFFERED"

// PendingStream is a stream in which no data is made visible to
// readers until the stream is finalized and committed explicitly.
PendingStream StreamType = "PENDING"
)

func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
switch t {
case CommittedStream:
return storagepb.WriteStream_COMMITTED
case PendingStream:
return storagepb.WriteStream_PENDING
case BufferedStream:
return storagepb.WriteStream_BUFFERED
default:
return storagepb.WriteStream_TYPE_UNSPECIFIED
}
}

// ManagedStream is the abstraction over a single write stream.
type ManagedStream struct {
streamSettings *streamSettings
destinationTable string
c *Client
}

// streamSettings govern behavior of the append stream RPCs.
type streamSettings struct {

// streamID contains the reference to the destination stream.
streamID string

// streamType governs behavior of the client, such as how
// offset handling is managed.
streamType StreamType

// MaxInflightRequests governs how many unacknowledged
// append writes can be outstanding into the system.
MaxInflightRequests int

// MaxInflightBytes governs how many unacknowledged
// request bytes can be outstanding into the system.
MaxInflightBytes int

// TracePrefix sets a suitable prefix for the trace ID set on
// append requests. Useful for diagnostic purposes.
TracePrefix string
}

func defaultStreamSettings() *streamSettings {
return &streamSettings{
streamType: DefaultStream,
MaxInflightRequests: 1000,
MaxInflightBytes: 0,
TracePrefix: "defaultManagedWriter",
}
}

// StreamName returns the corresponding write stream ID being managed by this writer.
func (ms *ManagedStream) StreamName() string {
return ms.streamSettings.streamID
}

// StreamType returns the configured type for this stream.
func (ms *ManagedStream) StreamType() StreamType {
return ms.streamSettings.streamType
}

// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
// this method for other stream types yields an error.
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
req := &storagepb.FlushRowsRequest{
WriteStream: ms.streamSettings.streamID,
Offset: &wrapperspb.Int64Value{
Value: offset,
},
}
resp, err := ms.c.rawClient.FlushRows(ctx, req)
if err != nil {
return 0, err
}
return resp.GetOffset(), nil
}

// Finalize is used to mark a stream as complete, and thus ensure no further data can
// be appended to the stream. You cannot finalize a DefaultStream, as it always exists.
//
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
// data in a PendingStream.
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
req := &storagepb.FinalizeWriteStreamRequest{
Name: ms.streamSettings.streamID,
}
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
if err != nil {
return 0, err
}
return resp.GetRowCount(), nil
}

0 comments on commit 4f7193b

Please sign in to comment.