New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigquery/storage/managedwriter): add base client #4422
Merged
shollyman
merged 10 commits into
googleapis:master
from
shollyman:fr-managedwriter-baseclient
Jul 14, 2021
Merged
Changes from 6 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
4d94327
feat(bigquery/storage/managedwriter): add base client
shollyman 934e641
clarify docstring
shollyman f4e2a37
Merge branch 'master' into fr-managedwriter-baseclient
shollyman 4a847a8
address comment lint errors
shollyman 31e8faa
refactor into an explicit Client and ManagedStream type
shollyman 3a0290f
Merge branch 'master' into fr-managedwriter-baseclient
shollyman d302a58
Merge branch 'master' into fr-managedwriter-baseclient
shollyman 4c303f5
adjust NewManagedStream signature, make dest table optional
shollyman 290f887
update comment
shollyman f1e409d
Merge branch 'master' into fr-managedwriter-baseclient
shollyman File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
// Copyright 2021 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package managedwriter | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"runtime" | ||
"strings" | ||
|
||
"cloud.google.com/go/bigquery" | ||
storage "cloud.google.com/go/bigquery/storage/apiv1beta2" | ||
"google.golang.org/api/option" | ||
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" | ||
) | ||
|
||
// Client is a managed BigQuery Storage write client scoped to a single project. | ||
type Client struct { | ||
rawClient *storage.BigQueryWriteClient | ||
projectID string | ||
} | ||
|
||
// NewClient instantiates a new client. | ||
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) { | ||
numConns := runtime.GOMAXPROCS(0) | ||
if numConns > 4 { | ||
numConns = 4 | ||
} | ||
o := []option.ClientOption{ | ||
option.WithGRPCConnectionPool(numConns), | ||
} | ||
o = append(o, opts...) | ||
|
||
rawClient, err := storage.NewBigQueryWriteClient(ctx, o...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Client{ | ||
rawClient: rawClient, | ||
projectID: projectID, | ||
}, nil | ||
} | ||
|
||
// NewManagedStream establishes a new stream for writing. | ||
func (c *Client) NewManagedStream(ctx context.Context, table *bigquery.Table, opts ...WriterOption) (*ManagedStream, error) { | ||
|
||
ms := &ManagedStream{ | ||
streamSettings: defaultStreamSettings(), | ||
c: c, | ||
} | ||
|
||
// apply writer options | ||
for _, opt := range opts { | ||
opt(ms) | ||
} | ||
|
||
if ms.streamSettings.streamID == "" && ms.streamSettings.streamType == "" { | ||
return nil, fmt.Errorf("TODO insufficient validation") | ||
shollyman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if ms.streamSettings.streamID == "" { | ||
// not instantiated with a stream, construct one. | ||
streamName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s/_default", table.ProjectID, table.DatasetID, table.TableID) | ||
if ms.streamSettings.streamType != DefaultStream { | ||
// For everything but a default stream, we create a new stream on behalf of the user. | ||
req := &storagepb.CreateWriteStreamRequest{ | ||
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", table.ProjectID, table.DatasetID, table.TableID), | ||
WriteStream: &storagepb.WriteStream{ | ||
Type: streamTypeToEnum(ms.streamSettings.streamType), | ||
}} | ||
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req) | ||
if err != nil { | ||
return nil, fmt.Errorf("couldn't create write stream: %v", err) | ||
} | ||
streamName = resp.GetName() | ||
} | ||
ms.streamSettings.streamID = streamName | ||
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc. | ||
} | ||
|
||
return ms, nil | ||
} | ||
|
||
// BatchCommit is used to commit one or more PendingStream streams belonging to the same table | ||
// as a single transaction. Streams must be finalized before committing. | ||
// | ||
// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type. | ||
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) { | ||
|
||
// determine table from first streamName, as all must share the same table. | ||
if len(streamNames) <= 0 { | ||
return nil, fmt.Errorf("no streamnames provided") | ||
} | ||
|
||
req := &storagepb.BatchCommitWriteStreamsRequest{ | ||
Parent: tableParentFromStreamName(streamNames[0]), | ||
WriteStreams: streamNames, | ||
} | ||
return c.rawClient.BatchCommitWriteStreams(ctx, req) | ||
} | ||
|
||
// getWriteStream returns information about a given write stream. | ||
// | ||
// It is not currently exported because it's unclear what we should surface here to the client, but we can use it for validation. | ||
func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) { | ||
req := &storagepb.GetWriteStreamRequest{ | ||
Name: streamName, | ||
} | ||
return c.rawClient.GetWriteStream(ctx, req) | ||
} | ||
|
||
// tableParentFromStreamName return the corresponding parent table | ||
// identifier given a fully qualified streamname. | ||
func tableParentFromStreamName(streamName string) string { | ||
// Stream IDs have the following prefix: | ||
// projects/{project}/datasets/{dataset}/tables/{table}/blah | ||
parts := strings.SplitN(streamName, "/", 7) | ||
if len(parts) < 7 { | ||
// invalid; just pass back the input | ||
return streamName | ||
} | ||
return strings.Join(parts[:6], "/") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
// Copyright 2021 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package managedwriter | ||
|
||
import "testing" | ||
|
||
func TestTableParentFromStreamName(t *testing.T) { | ||
testCases := []struct { | ||
in string | ||
want string | ||
}{ | ||
{ | ||
"bad", | ||
"bad", | ||
}, | ||
{ | ||
"projects/foo/datasets/bar/tables/baz", | ||
"projects/foo/datasets/bar/tables/baz", | ||
}, | ||
{ | ||
"projects/foo/datasets/bar/tables/baz/zip/zam/zoomie", | ||
"projects/foo/datasets/bar/tables/baz", | ||
}, | ||
{ | ||
"projects/foo/datasets/bar/tables/baz/_default", | ||
"projects/foo/datasets/bar/tables/baz", | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
got := tableParentFromStreamName(tc.in) | ||
if got != tc.want { | ||
t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
// Copyright 2021 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package managedwriter | ||
|
||
import ( | ||
"context" | ||
|
||
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" | ||
"google.golang.org/protobuf/types/known/wrapperspb" | ||
) | ||
|
||
// StreamType indicates the type of stream this write client is managing. | ||
type StreamType string | ||
|
||
var ( | ||
// DefaultStream most closely mimics the legacy bigquery | ||
// tabledata.insertAll semantics. Successful inserts are | ||
// committed immediately, and there's no tracking offsets as | ||
// all writes go into a "default" stream that always exists | ||
// for a table. | ||
DefaultStream StreamType = "DEFAULT" | ||
|
||
// CommittedStream appends data immediately, but creates a | ||
// discrete stream for the work so that offset tracking can | ||
// be used to track writes. | ||
CommittedStream StreamType = "COMMITTED" | ||
|
||
// BufferedStream is a form of checkpointed stream, that allows | ||
// you to advance the offset of visible rows via Flush operations. | ||
BufferedStream StreamType = "BUFFERED" | ||
|
||
// PendingStream is a stream in which no data is made visible to | ||
// readers until the stream is finalized and committed explicitly. | ||
PendingStream StreamType = "PENDING" | ||
) | ||
|
||
func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { | ||
switch t { | ||
case CommittedStream: | ||
return storagepb.WriteStream_COMMITTED | ||
case PendingStream: | ||
return storagepb.WriteStream_PENDING | ||
case BufferedStream: | ||
return storagepb.WriteStream_BUFFERED | ||
default: | ||
return storagepb.WriteStream_TYPE_UNSPECIFIED | ||
} | ||
} | ||
|
||
// ManagedStream is the abstraction over a single write stream. | ||
type ManagedStream struct { | ||
streamSettings *streamSettings | ||
c *Client | ||
} | ||
|
||
// streamSettings govern behavior of the append stream RPCs. | ||
type streamSettings struct { | ||
|
||
// streamID contains the reference to the destination stream. | ||
streamID string | ||
|
||
// streamType governs behavior of the client, such as how | ||
// offset handling is managed. | ||
streamType StreamType | ||
|
||
// MaxInflightRequests governs how many unacknowledged | ||
// append writes can be outstanding into the system. | ||
MaxInflightRequests int | ||
|
||
// MaxInflightBytes governs how many unacknowledged | ||
// request bytes can be outstanding into the system. | ||
MaxInflightBytes int | ||
|
||
// TracePrefix sets a suitable prefix for the trace ID set on | ||
// append requests. Useful for diagnostic purposes. | ||
TracePrefix string | ||
} | ||
|
||
func defaultStreamSettings() *streamSettings { | ||
return &streamSettings{ | ||
streamType: DefaultStream, | ||
MaxInflightRequests: 1000, | ||
MaxInflightBytes: 0, | ||
TracePrefix: "defaultManagedWriter", | ||
} | ||
} | ||
|
||
// StreamName returns the corresponding write stream ID being managed by this writer. | ||
func (ms *ManagedStream) StreamName() string { | ||
return ms.streamSettings.streamID | ||
} | ||
|
||
// StreamType returns the configured type for this stream. | ||
func (ms *ManagedStream) StreamType() StreamType { | ||
return ms.streamSettings.streamType | ||
} | ||
|
||
// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling | ||
// this method for other stream types yields an error. | ||
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) { | ||
req := &storagepb.FlushRowsRequest{ | ||
WriteStream: ms.streamSettings.streamID, | ||
Offset: &wrapperspb.Int64Value{ | ||
Value: offset, | ||
}, | ||
} | ||
resp, err := ms.c.rawClient.FlushRows(ctx, req) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return resp.GetOffset(), nil | ||
} | ||
|
||
// Finalize is used to mark a stream as complete, and thus ensure no further data can | ||
// be appended to the stream. You cannot finalize a DefaultStream, as it always exists. | ||
// | ||
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit | ||
// data in a PendingStream. | ||
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { | ||
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in. | ||
req := &storagepb.FinalizeWriteStreamRequest{ | ||
Name: ms.streamSettings.streamID, | ||
} | ||
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return resp.GetRowCount(), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// Copyright 2021 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package managedwriter | ||
|
||
// WriterOption is used to configure a ManagedWriteClient. | ||
type WriterOption func(*ManagedStream) | ||
|
||
// WithType sets the write type of the new writer. | ||
func WithType(st StreamType) WriterOption { | ||
return func(ms *ManagedStream) { | ||
ms.streamSettings.streamType = st | ||
} | ||
} | ||
|
||
// WithStreamName allows users to set the stream name this writer will | ||
// append to explicitly. By default, the managed client will create the | ||
// stream when instantiated. | ||
// | ||
// Caveat: It is possible to specify stream name and type explicitly, which | ||
// is not validated for correctness. In such cases, behavior is indeterminate. | ||
func WithStreamName(name string) WriterOption { | ||
return func(ms *ManagedStream) { | ||
ms.streamSettings.streamID = name | ||
} | ||
} | ||
|
||
// WithMaxInflightRequests bounds the inflight appends on the write connection. | ||
func WithMaxInflightRequests(n int) WriterOption { | ||
return func(ms *ManagedStream) { | ||
ms.streamSettings.MaxInflightRequests = n | ||
} | ||
} | ||
|
||
// WithMaxInflightBytes bounds the inflight append request bytes on the write connection. | ||
func WithMaxInflightBytes(n int) WriterOption { | ||
return func(ms *ManagedStream) { | ||
ms.streamSettings.MaxInflightBytes = n | ||
} | ||
} | ||
|
||
// WithTracePrefix allows instruments requests to the service with a custom trace prefix. | ||
// This is generally for diagnostic purposes only. | ||
func WithTracePrefix(prefix string) WriterOption { | ||
return func(ms *ManagedStream) { | ||
ms.streamSettings.TracePrefix = prefix | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think table could just be datasetID and tableID. I worry about this client reaching for code in both directions up and down the file structure. This means the core bigquery package could never rely on this package due to cycles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine switching it to string, now I'm wondering if it should be a variadic argument. We don't do anything with the reference other than for the construct a stream case, but if we allow users to pass in an optional stream ID perhaps we let them treat table as optional as well? Will switch it and augment the validation (your other comment) and see how it plays.